From 702dcb0423f40c44d69fe0c038fb52840a57f284 Mon Sep 17 00:00:00 2001 From: Ali Khadivi Date: Mon, 6 Oct 2025 22:44:09 +0330 Subject: [PATCH 1/6] feat(messages): improve mention parsing, add read-all-mentions & logout handler --- .../Aggregates/Dialog/DialogAggregate.cs | 4 +- .../Dialog/ReadMentionCommandHandler.cs | 2 +- .../Commands/Dialog/ReadMentionCommand.cs | 4 +- .../Sagas/SendMessageSaga.cs | 10 + .../EventHandlers/LogOutEventHandler.cs | 19 + ...elegramMessengerCommandServerExtensions.cs | 1 + .../Messages/ReadMentionsHandler.cs | 30 +- .../Messages/SendMessageHandler.cs | 71 +-- .../Services/Impl/MessageAppService.cs | 433 +++++++++++++----- .../Services/Impl/UsernameHelper.cs | 78 +++- .../Services/Interfaces/IMessageAppService.cs | 3 + .../Services/Interfaces/IUsernameHelper.cs | 1 + .../Impl/DialogReadModel.cs | 20 +- .../Impl/MessageReadModel.cs | 30 +- 14 files changed, 527 insertions(+), 179 deletions(-) create mode 100644 source/src/MyTelegram.Messenger.CommandServer/EventHandlers/LogOutEventHandler.cs diff --git a/source/src/MyTelegram.Domain/Aggregates/Dialog/DialogAggregate.cs b/source/src/MyTelegram.Domain/Aggregates/Dialog/DialogAggregate.cs index 372b81f82..6d0dec9c1 100644 --- a/source/src/MyTelegram.Domain/Aggregates/Dialog/DialogAggregate.cs +++ b/source/src/MyTelegram.Domain/Aggregates/Dialog/DialogAggregate.cs @@ -156,10 +156,10 @@ int date toPeer, date)); } - public void ReadMention(int messageId) + public void ReadMention(int messageId, bool readAllMentions = false) { Specs.AggregateIsCreated.ThrowDomainErrorIfNotSatisfied(this); - var unreadMentionsCount = _state.UnreadMentionsCount - 1; + var unreadMentionsCount = readAllMentions ? 0 : _state.UnreadMentionsCount - 1; if (unreadMentionsCount < 0) { unreadMentionsCount = 0; diff --git a/source/src/MyTelegram.Domain/CommandHandlers/Dialog/ReadMentionCommandHandler.cs b/source/src/MyTelegram.Domain/CommandHandlers/Dialog/ReadMentionCommandHandler.cs index 6a9195ec8..107cf368e 100644 --- a/source/src/MyTelegram.Domain/CommandHandlers/Dialog/ReadMentionCommandHandler.cs +++ b/source/src/MyTelegram.Domain/CommandHandlers/Dialog/ReadMentionCommandHandler.cs @@ -4,7 +4,7 @@ public class ReadMentionCommandHandler : CommandHandler(aggregateId) { public long OwnerUserId { get; } = ownerUserId; @@ -8,6 +8,8 @@ public class ReadMentionCommand(DialogId aggregateId, long ownerUserId, int mess //public long ToPeerId { get; } public int MessageId { get; } = messageId; + public bool ReadAllMentions { get; } = readAllMentions; + /*long toPeerId,*/ //ToPeerId = toPeerId; } \ No newline at end of file diff --git a/source/src/MyTelegram.Domain/Sagas/SendMessageSaga.cs b/source/src/MyTelegram.Domain/Sagas/SendMessageSaga.cs index d6d2e6990..b2ba862e3 100644 --- a/source/src/MyTelegram.Domain/Sagas/SendMessageSaga.cs +++ b/source/src/MyTelegram.Domain/Sagas/SendMessageSaga.cs @@ -25,6 +25,7 @@ public class SendMessageSaga : MyInMemoryAggregateSaga { private readonly IIdGenerator _idGenerator; + private bool _draftCleared; private readonly SendMessageSagaState _state = new(); public SendMessageSaga(SendMessageSagaId id, IEventStore eventStore, IIdGenerator idGenerator) : base(id, eventStore) { @@ -121,6 +122,15 @@ public async Task HandleAsync(IDomainEvent, + ITransientDependency +{ + public async Task HandleEventAsync(UserLoggedOutEvent eventData) + { + var command = new UnRegisterDeviceForAuthKeyCommand( + DeviceId.Create(eventData.PermAuthKeyId), + eventData.PermAuthKeyId, + eventData.TempAuthKeyId); + await commandBus.PublishAsync(command, CancellationToken.None); + } +} diff --git a/source/src/MyTelegram.Messenger.CommandServer/Extensions/MyTelegramMessengerCommandServerExtensions.cs b/source/src/MyTelegram.Messenger.CommandServer/Extensions/MyTelegramMessengerCommandServerExtensions.cs index d2eb05018..291a13baa 100644 --- a/source/src/MyTelegram.Messenger.CommandServer/Extensions/MyTelegramMessengerCommandServerExtensions.cs +++ b/source/src/MyTelegram.Messenger.CommandServer/Extensions/MyTelegramMessengerCommandServerExtensions.cs @@ -22,6 +22,7 @@ public static void AddEventHandlers(this IServiceCollection services) services.AddSubscription(); services.AddSubscription(); + services.AddSubscription(); } public static void AddMyTelegramMessengerCommandServer(this IServiceCollection services, diff --git a/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/ReadMentionsHandler.cs b/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/ReadMentionsHandler.cs index 54719c08d..46295ba60 100644 --- a/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/ReadMentionsHandler.cs +++ b/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/ReadMentionsHandler.cs @@ -10,11 +10,35 @@ /// 400 PEER_ID_INVALID The provided peer id is invalid. /// See /// -internal sealed class ReadMentionsHandler : RpcResultObjectHandler +internal sealed class ReadMentionsHandler( + ICommandBus commandBus, + IPeerHelper peerHelper, + IAccessHashHelper accessHashHelper, + IQueryProcessor queryProcessor, + IPtsHelper ptsHelper) + : RpcResultObjectHandler { - protected override Task HandleCoreAsync(IRequestInput input, + protected override async Task HandleCoreAsync(IRequestInput input, MyTelegram.Schema.Messages.RequestReadMentions obj) { - throw new NotImplementedException(); + await accessHashHelper.CheckAccessHashAsync(input, obj.Peer); + var peer = peerHelper.GetPeer(obj.Peer, input.UserId); + var dialogId = DialogId.Create(input.UserId, peer); + + var dialogReadModel = await queryProcessor.ProcessAsync(new GetDialogByIdQuery(dialogId.Value)); + + if (dialogReadModel != null && dialogReadModel.UnreadMentionsCount > 0) + { + var messageId = obj.TopMsgId ?? dialogReadModel.TopMessage; + var command = new ReadMentionCommand(dialogId, input.UserId, messageId, true); + await commandBus.PublishAsync(command); + } + + return new TAffectedHistory + { + Pts = ptsHelper.GetCachedPts(input.UserId), + PtsCount = 0, + Offset = 0 + }; } } diff --git a/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/SendMessageHandler.cs b/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/SendMessageHandler.cs index b7797b5ac..bc4769d5b 100644 --- a/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/SendMessageHandler.cs +++ b/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/SendMessageHandler.cs @@ -72,25 +72,35 @@ internal sealed class SendMessageHandler( : RpcResultObjectHandler { protected override async Task HandleCoreAsync(IRequestInput input, - RequestSendMessage obj) + RequestSendMessage obj) { await accessHashHelper.CheckAccessHashAsync(input, obj.Peer); await accessHashHelper.CheckAccessHashAsync(input, obj.SendAs); - var media = await ProcessUrlsInMessageAsync(obj); + + var media = await messageAppService.CreateInvitePreviewIfAnyAsync( + obj.Message, + options.Value.JoinChatDomain); + if (obj.Message.StartsWith("/")) { - obj.Entities ??= []; - obj.Entities.Add(new TMessageEntityBotCommand + var match = Regex.Match(obj.Message, @"^/([A-Za-z0-9_]{1,64})\b"); + + if (match.Success) { - Length = obj.Message.Length, - Offset = 0 - }); + obj.Entities ??= []; + obj.Entities.Add(new TMessageEntityBotCommand + { + Offset = 0, + Length = match.Length // includes leading slash + }); + } } int? topMsgId = null; var sendAs = peerHelper.GetPeer(obj.SendAs, input.UserId); - var sendMessageInput = new SendMessageInput(input.ToRequestInfo(), + var sendMessageInput = new SendMessageInput( + input.ToRequestInfo(), input.UserId, peerHelper.GetPeer(obj.Peer, input.UserId), obj.Message, @@ -112,49 +122,4 @@ protected override async Task HandleCoreAsync(IRequestInput input, return null!; } - private async Task ProcessUrlsInMessageAsync(RequestSendMessage obj) - { - var pattern = @"(?:^|\s)(https?://[^\s]+)(?=\s|$)"; - var pattern2 = @$"{options.Value.JoinChatDomain}/\+([\S]{{16}})"; - var matches = Regex.Matches(obj.Message, pattern); - var isInviteUrlAdded = false; - TMessageMediaWebPage? media = null; - foreach (Match match in matches) - { - obj.Entities ??= []; - var url = match.Groups[1].Value; - var m2 = Regex.Match(url, pattern2); - if (m2.Success && !isInviteUrlAdded) - { - var link = m2.Groups[1].Value; - var chatInvite = await queryProcessor.ProcessAsync(new GetChatInviteByLinkQuery(link)); - if (chatInvite != null) - { - var channelReadModel = await channelAppService.GetAsync(chatInvite.PeerId); - // Super group/Public channel - if (!channelReadModel.Broadcast || - (channelReadModel.Broadcast && !string.IsNullOrEmpty(channelReadModel.UserName))) - { - media = new TMessageMediaWebPage - { - Webpage = new Schema.TWebPage - { - Id = Random.Shared.NextInt64(), - Url = $"{options.Value.JoinChatDomain}/+{link}", - DisplayUrl = $"{options.Value.JoinChatDomain}/+{link}", - Type = channelReadModel.Broadcast ? "telegram_channel" : "telegram_megagroup", - SiteName = "MyTelegram", - Title = channelReadModel.Title, - Description = $"Join this group on MyTelegram.", - } - }; - } - - isInviteUrlAdded = true; - } - } - } - - return media; - } } diff --git a/source/src/MyTelegram.Messenger/Services/Impl/MessageAppService.cs b/source/src/MyTelegram.Messenger/Services/Impl/MessageAppService.cs index 6f8ddf33a..2d3adbfd7 100644 --- a/source/src/MyTelegram.Messenger/Services/Impl/MessageAppService.cs +++ b/source/src/MyTelegram.Messenger/Services/Impl/MessageAppService.cs @@ -1,4 +1,6 @@ -namespace MyTelegram.Messenger.Services.Impl; +using MyTelegram.Messenger.Services.Interfaces; + +namespace MyTelegram.Messenger.Services.Impl; public class MessageAppService( IQueryProcessor queryProcessor, @@ -10,12 +12,69 @@ public class MessageAppService( IUserAppService userAppService, IPrivacyAppService privacyAppService, IContactAppService contactAppService, + IUsernameHelper usernameHelper, IOffsetHelper offsetHelper, IIdGenerator idGenerator) : BaseAppService, IMessageAppService, ITransientDependency { - private const string HashtagPattern = "#(\\w+)"; - private const string UrlPattern = @"(?:^|\s)((https?:\/\/)?[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}(\/[^\s,.:;!?]*)?)"; + private const string HashtagPattern = @"#[A-Za-z][A-Za-z0-9_]{0,255}"; + + // Max length clamp for URL entities + private const int MaxUrlLength = 2048; + + // Use a short timeout to prevent runaway backtracking + private static readonly TimeSpan RxTimeout = TimeSpan.FromMilliseconds(150); + + // Cap for normal letter TLDs: e.g., "com", "technology", "international" (<=15) + private const int TldMaxLetters = 15; + + // Strong URL regex: optional scheme, IPv4 or domain, optional port, path allowing balanced parens; + // no trailing punctuation inside the entity; avoids picking up emails/usernames. + private static readonly Regex UrlRegex = new( + """ +(?xi) +(?\d{2,5}) )? # optional port + +(?: # --- optional path/query/frag --- + / # path starts + (?: + [^\s<>()\[\]{}"'`]+ + | \([^\s<>()\[\]{}"'`]*\) + )* +)? +(?= + \s | $ | [)\]\}.,!?;:] # stop before trailing punctuation/space/end +) +""", + RegexOptions.Compiled | RegexOptions.IgnoreCase | RegexOptions.CultureInvariant, + RxTimeout); + + // Email ranges we want to exclude from mentions + private static readonly Regex EmailRegex = new( + @"(?xi)\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,24}\b", + RegexOptions.Compiled | RegexOptions.CultureInvariant | RegexOptions.IgnoreCase, + RxTimeout); public void CheckBotPermission(long requestUserId, Peer toPeer) { @@ -365,7 +424,7 @@ await queryProcessor.ProcessAsync(new GetReplyToMsgIdListQuery(input.ToPeer, inp int? views = null; if (channelReadModel?.Broadcast ?? false) { - views = 1; + views = 0; } if (channelReadModel is { Signatures: true, Broadcast: true }) { @@ -515,110 +574,18 @@ private async Task CheckGlobalPrivacySettingsAsync(SendMessageInput input) public Task> ProcessMessageEntitiesAsync(string? message, IList? entities, Peer toPeer) { - if (string.IsNullOrEmpty(message)) - { + if (string.IsNullOrWhiteSpace(message)) return Task.FromResult>([]); - } - - ProcessMessageEntityHashtag(message, entities); - ProcessMessageEntityUrlList(message, entities); - return ProcessMessageEntityMentionAsync(message, entities, toPeer); - } - - private async Task> ProcessMessageEntityMentionAsync(string message, IList? entities, Peer toPeer) - { - var mentionsAndUserNames = GetMentions(message); - var mentions = mentionsAndUserNames.mentions; - var mentionedUserNames = mentionsAndUserNames.userNameList; - var mentionedUserIds = new List(); - - if (entities?.Count > 0) - { - foreach (var messageEntity in entities) - { - switch (messageEntity) - { - case TInputMessageEntityMentionName inputMessageEntityMentionName: - var userPeer = peerHelper.GetPeer(inputMessageEntityMentionName.UserId); - mentionedUserIds.Add(userPeer.PeerId); - break; - case TMessageEntityMention messageEntityMention: - mentionedUserNames.Add(message.Substring(messageEntityMention.Offset + 1, - messageEntityMention.Length - 1)); - break; - case TMessageEntityMentionName messageEntityMentionName: - mentionedUserIds.Add(messageEntityMentionName.UserId); - break; - } - } - } - - if (mentionedUserNames.Count > 0) - { - entities ??= []; - foreach (var messageEntityMention in mentions) - { - entities.Add(messageEntityMention); - } - } - if (toPeer.PeerType == PeerType.Channel) - { - var mentionedUsers = - await queryProcessor.ProcessAsync(new GetUserNameListByNamesQuery(mentionedUserNames, PeerType.User)); - mentionedUserIds.AddRange(mentionedUsers.Select(p => p.PeerId).Distinct().ToList()); - - var memberUserIds = - await queryProcessor.ProcessAsync(new GetChannelMemberIdListQuery(toPeer.PeerId, mentionedUserIds)); - - mentionedUserIds = memberUserIds.ToList(); - } - else - { - mentionedUserIds = []; - } - - return mentionedUserIds; - } - - private (List mentions, List userNameList) GetMentions(string message) - { - var pattern = "@(\\w{4,40})"; - var mentions = new List(); - var matches = Regex.Matches(message, pattern); - var userNameList = new List(); - foreach (Match match in matches) - { - if (match.Success) - { - mentions.Add(new TMessageEntityMention - { - Offset = match.Index, - Length = match.Length - }); - userNameList.Add(match.Value[1..]); - } - } + // 1) URLs first (also returns overlap guard map) + var used = ProcessMessageEntityUrlListWithOverlap(message, ref entities); - return (mentions, userNameList); - } + // 2) Hashtags (no overlap concerns in your spec, but we can keep as-is) + ProcessMessageEntityHashtag(message, entities); - private void ProcessMessageEntityUrlList(string message, IList? entities) - { - var matches = Regex.Matches(message, UrlPattern); - foreach (Match match in matches) - { - if (match.Success) - { - var entity = new TMessageEntityUrl - { - Offset = match.Index, - Length = match.Length - }; - entities ??= []; - entities.Add(entity); - } - } + // 3) Mentions (skip any overlap with URLs/emails) + var result = ProcessMessageEntityMentionAsyncSafe(message, entities, toPeer, used); + return result; } private void ProcessMessageEntityHashtag(string message, IList? entities) @@ -807,5 +774,263 @@ void AddPeerIdIfNeeded(Peer? peer) break; } } + + } + + // Creates URL entities, respecting max length and "one entity per character" rule. + // Returns a boolean mask of used characters (to prevent overlaps with mentions). + private static bool[] ProcessMessageEntityUrlListWithOverlap(string message, ref IList? entities) + { + var used = new bool[message.Length]; + var matches = UrlRegex.Matches(message); + if (matches.Count == 0) return used; + + entities ??= []; + + foreach (Match m in matches) + { + var (start, length) = TrimTrailingPunctuationAndBalance(message, m.Index, m.Length); + if (length <= 0) continue; + + if (length > MaxUrlLength) + length = MaxUrlLength; + + if (AnyUsed(used, start, length)) + continue; + + MarkUsed(used, start, length); + + entities.Add(new TMessageEntityUrl + { + Offset = start, + Length = length + }); + } + + return used; + } + + private static (int start, int length) TrimTrailingPunctuationAndBalance(string s, int start, int length) + { + if (length <= 0) return (start, 0); + + while (length > 0) + { + char ch = s[start + length - 1]; + if (")]},.!?;:".IndexOf(ch) >= 0) length--; + else break; + } + + // Balance trailing ')' if they exceed '(' in the captured piece + int open = 0, close = 0; + for (int i = 0; i < length; i++) + { + char c = s[start + i]; + if (c == '(') open++; + else if (c == ')') close++; + } + while (length > 0 && close > open) + { + if (s[start + length - 1] == ')') { length--; close--; } + else break; + } + + return (start, length); + } + + private static bool AnyUsed(bool[] used, int start, int len) + { + int end = Math.Min(used.Length, start + len); + for (int i = start; i < end; i++) + if (used[i]) return true; + return false; + } + + private static void MarkUsed(bool[] used, int start, int len) + { + int end = Math.Min(used.Length, start + len); + for (int i = start; i < end; i++) + used[i] = true; + } + + // New mention processor that ignores usernames inside URLs or emails + private async Task> ProcessMessageEntityMentionAsyncSafe( + string message, + IList? entities, + Peer toPeer, + bool[] usedByUrls) + { + // Mark email ranges as used too (so @ inside email never becomes mention) + var used = (bool[])usedByUrls.Clone(); + foreach (Match em in EmailRegex.Matches(message)) + MarkUsed(used, em.Index, em.Length); + + // Collect inline-provided entities first (existing logic preserved) + var mentionedUserIds = new List(); + var candidateUsernames = new List(); + var mentionEntities = new List(); + + if (entities is { Count: > 0 }) + { + foreach (var e in entities) + { + switch (e) + { + case TInputMessageEntityMentionName named: + mentionedUserIds.Add(peerHelper.GetPeer(named.UserId).PeerId); + break; + + case TMessageEntityMention m: + // Keep these, but we’ll add text-parsed mentions below (non-overlapping) + candidateUsernames.Add(message.Substring(m.Offset + 1, m.Length - 1)); + mentionEntities.Add(m); + break; + + case TMessageEntityMentionName mn: + mentionedUserIds.Add(mn.UserId); + break; + } + } + } + + //// Text-based mentions using safe regex + overlap guard + //foreach (Match mm in MentionRegex.Matches(message)) + //{ + // var start = mm.Index; + // var length = mm.Length; + + // if (AnyUsed(used, start, length)) + // continue; // skip if inside URL/email (or already an entity) + + // var uname = mm.Groups[1].Value; // without '@' + // // mark the range as used so nothing overlaps + // MarkUsed(used, start, length); + + // candidateUsernames.Add(uname); + // mentionEntities.Add(new TMessageEntityMention { Offset = start, Length = length }); + //} + + foreach (var (start, length, uname) in usernameHelper.FindMentions(message)) + { + if (AnyUsed(used, start, length)) + continue; + + MarkUsed(used, start, length); + candidateUsernames.Add(uname); + mentionEntities.Add(new TMessageEntityMention { Offset = start, Length = length }); + } + + if (mentionEntities.Count > 0) + { + entities ??= []; + foreach (var m in mentionEntities) + entities.Add(m); + } + + // Resolve to user IDs (same as your original logic) + if (toPeer.PeerType == PeerType.Channel && candidateUsernames.Count > 0) + { + var mentionedUsers = await queryProcessor.ProcessAsync( + new GetUserNameListByNamesQuery(candidateUsernames, PeerType.User)); + + mentionedUserIds.AddRange(mentionedUsers.Select(p => p.PeerId).Distinct()); + + var memberUserIds = await queryProcessor.ProcessAsync( + new GetChannelMemberIdListQuery(toPeer.PeerId, mentionedUserIds)); + + return memberUserIds.ToList(); + } + + return []; + } + + public async Task CreateInvitePreviewIfAnyAsync( + string text, + string joinChatDomain) + { + if (string.IsNullOrWhiteSpace(text)) + return null; + + var inviteRx = BuildInviteRegex(joinChatDomain); // add the builder below if you don't have it yet + + // Scan URLs once, trim punctuation, clamp length + var matches = UrlRegex.Matches(text); + if (matches.Count == 0) + return null; + + foreach (Match m in matches) + { + var (start, length) = TrimTrailingPunctuationAndBalance(text, m.Index, m.Length); + if (length <= 0) continue; + if (length > MaxUrlLength) length = MaxUrlLength; + + var url = text.Substring(start, length); + var im = inviteRx.Match(url); + if (!im.Success) continue; + + var link = im.Groups["link"].Value; + + var chatInvite = await queryProcessor.ProcessAsync(new GetChatInviteByLinkQuery(link)); + if (chatInvite is null) continue; + + var channel = await channelAppService.GetAsync(chatInvite.PeerId); + + // Supergroup/public channel preview only + if (!channel.Broadcast || (channel.Broadcast && !string.IsNullOrEmpty(channel.UserName))) + { + var baseJoin = joinChatDomain.TrimEnd('/'); + return new TMessageMediaWebPage + { + Webpage = new Schema.TWebPage + { + Id = Random.Shared.NextInt64(), + Url = $"{baseJoin}/+{link}", + DisplayUrl = $"{baseJoin}/+{link}", + Type = channel.Broadcast ? "telegram_channel" : "telegram_megagroup", + SiteName = "MyTelegram", + Title = channel.Title, + Description = "Join this group on MyTelegram.", + } + }; + } + + // Only one preview is allowed; if this one is not eligible, continue scanning. + } + + return null; + } + + private Regex BuildInviteRegex(string joinDomain) + { + var normalized = joinDomain.Trim().TrimEnd('/'); + string host, path = ""; + try + { + if (normalized.StartsWith("http", StringComparison.OrdinalIgnoreCase)) + { + var u = new Uri(normalized); + host = u.Host; + path = u.AbsolutePath.Trim('/'); + } + else + { + var slash = normalized.IndexOf('/'); + host = slash >= 0 ? normalized[..slash] : normalized; + path = slash >= 0 ? normalized[(slash + 1)..] : ""; + } + } + catch { host = normalized; } + + var hostRx = Regex.Escape(host); + var pathRx = string.IsNullOrEmpty(path) ? "" : $"{Regex.Escape(path)}/"; + + var pat = $$""" + (?xi) + \b + (?:https?://)? (?:www\.)? {{hostRx}} / {{pathRx}} \+ (?[A-Za-z0-9_-]{16,64}) + (?= \s | $ | [)\]\}.,!?;:] ) + """; + + return new Regex(pat, RegexOptions.Compiled | RegexOptions.IgnoreCase | RegexOptions.CultureInvariant, RxTimeout); } } \ No newline at end of file diff --git a/source/src/MyTelegram.Messenger/Services/Impl/UsernameHelper.cs b/source/src/MyTelegram.Messenger/Services/Impl/UsernameHelper.cs index b547798c0..2eb14966c 100644 --- a/source/src/MyTelegram.Messenger/Services/Impl/UsernameHelper.cs +++ b/source/src/MyTelegram.Messenger/Services/Impl/UsernameHelper.cs @@ -2,9 +2,79 @@ public class UsernameHelper : IUsernameHelper, ITransientDependency { - private static readonly string Pattern = "^[a-z0-9_]{5,32}$"; - public bool IsValidUsername(string username) + private static readonly TimeSpan RxTimeout = TimeSpan.FromMilliseconds(150); + + // Validation for actual usernames (Telegram-like) + private static readonly Regex UsernameValidation = new( + @"^(?=.{5,32}$)[a-z0-9_]+$", + RegexOptions.Compiled | RegexOptions.CultureInvariant | RegexOptions.IgnoreCase, + RxTimeout); + + // Same URL + email regexes as service (kept private here for exclusion) + private static readonly Regex UrlRegex = new( + """ + (?xi) + (?()\[\]{}"'`]+|\([^\s<>()\[\]{}"'`]*\))*)? + (?=\s|$|[)\]\}.,!?;:]) + """, + RegexOptions.Compiled | RegexOptions.CultureInvariant | RegexOptions.IgnoreCase, + RxTimeout); + + private static readonly Regex EmailRegex = new( + @"(?xi)\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,24}\b", + RegexOptions.Compiled | RegexOptions.CultureInvariant | RegexOptions.IgnoreCase, + RxTimeout); + + // Mention detection with email/TLD safeguard; final exclusion via overlap below + private static readonly Regex MentionRegex = new( + @"(?xi)(? UsernameValidation.IsMatch(username ?? string.Empty); + + public IEnumerable<(int Offset, int Length, string Username)> FindMentions(string text) { - return Regex.IsMatch(username, Pattern, RegexOptions.IgnoreCase); + if (string.IsNullOrEmpty(text)) + yield break; + + var used = new bool[text.Length]; + + // Mark URL + email spans as excluded + foreach (Match m in UrlRegex.Matches(text)) + Mark(used, m.Index, m.Length); + foreach (Match m in EmailRegex.Matches(text)) + Mark(used, m.Index, m.Length); + + foreach (Match m in MentionRegex.Matches(text)) + { + int start = m.Index, len = m.Length; + if (Overlaps(used, start, len)) + continue; + + var uname = m.Groups[1].Value; + yield return (start, len, uname); + } + + static void Mark(bool[] a, int s, int l) + { + int e = Math.Min(a.Length, s + l); + for (int i = Math.Max(0, s); i < e; i++) a[i] = true; + } + + static bool Overlaps(bool[] a, int s, int l) + { + int e = Math.Min(a.Length, s + l); + for (int i = Math.Max(0, s); i < e; i++) + if (a[i]) return true; + return false; + } } -} \ No newline at end of file +} diff --git a/source/src/MyTelegram.Messenger/Services/Interfaces/IMessageAppService.cs b/source/src/MyTelegram.Messenger/Services/Interfaces/IMessageAppService.cs index f3b8a06b7..f4db82f21 100644 --- a/source/src/MyTelegram.Messenger/Services/Interfaces/IMessageAppService.cs +++ b/source/src/MyTelegram.Messenger/Services/Interfaces/IMessageAppService.cs @@ -23,4 +23,7 @@ public interface IMessageAppService Task> ProcessMessageEntitiesAsync(string? message, IList? entities, Peer toPeer); List GetHashtags(string? message); Task IsValidSendAsPeerAsync(long requestUserId, Peer toPeer, Peer? sendAsPeer); + Task CreateInvitePreviewIfAnyAsync( + string text, + string joinChatDomain); } \ No newline at end of file diff --git a/source/src/MyTelegram.Messenger/Services/Interfaces/IUsernameHelper.cs b/source/src/MyTelegram.Messenger/Services/Interfaces/IUsernameHelper.cs index f31ef49c7..29e442648 100644 --- a/source/src/MyTelegram.Messenger/Services/Interfaces/IUsernameHelper.cs +++ b/source/src/MyTelegram.Messenger/Services/Interfaces/IUsernameHelper.cs @@ -3,4 +3,5 @@ public interface IUsernameHelper { bool IsValidUsername(string username); + IEnumerable<(int Offset, int Length, string Username)> FindMentions(string text); } \ No newline at end of file diff --git a/source/src/MyTelegram.ReadModel/Impl/DialogReadModel.cs b/source/src/MyTelegram.ReadModel/Impl/DialogReadModel.cs index 7397f3c00..c001e6e99 100644 --- a/source/src/MyTelegram.ReadModel/Impl/DialogReadModel.cs +++ b/source/src/MyTelegram.ReadModel/Impl/DialogReadModel.cs @@ -23,7 +23,9 @@ public class DialogReadModel : IDialogReadModel, IAmReadModelFor, IAmReadModelFor, IAmReadModelFor, - IAmReadModelFor + IAmReadModelFor, + IAmReadModelFor, + IAmReadModelFor { public virtual int ChannelHistoryMinId { get; private set; } public virtual DateTime CreationTime { get; private set; } @@ -369,4 +371,20 @@ public Task ApplyAsync(IReadModelContext context, IDomainEvent domainEvent, + CancellationToken cancellationToken) + { + UnreadMentionsCount = domainEvent.AggregateEvent.UnreadMentionsCount; + return Task.CompletedTask; + } + + public Task ApplyAsync(IReadModelContext context, + IDomainEvent domainEvent, + CancellationToken cancellationToken) + { + UnreadMentionsCount = domainEvent.AggregateEvent.UnreadMentionsCount; + return Task.CompletedTask; + } } diff --git a/source/src/MyTelegram.ReadModel/Impl/MessageReadModel.cs b/source/src/MyTelegram.ReadModel/Impl/MessageReadModel.cs index 22ab605e3..0245785e5 100644 --- a/source/src/MyTelegram.ReadModel/Impl/MessageReadModel.cs +++ b/source/src/MyTelegram.ReadModel/Impl/MessageReadModel.cs @@ -23,7 +23,8 @@ public class MessageReadModel : IMessageReadModel, IAmReadModelFor, IAmReadModelFor, IAmReadModelFor, - IAmReadModelFor + IAmReadModelFor, + IAmReadModelFor { public int Date { get; private set; } public int? EditDate { get; private set; } @@ -92,11 +93,11 @@ public class MessageReadModel : IMessageReadModel, public bool InvertMedia { get; private set; } public bool PublicPosts { get; private set; } public List Hashtags { get; private set; } = []; - public List? MentionedUserIds { get; private set; } - public long? TodoId { get; private set; } - - public Task ApplyAsync(IReadModelContext context, - IDomainEvent domainEvent, + public List? MentionedUserIds { get; private set; } + public long? TodoId { get; private set; } + + public Task ApplyAsync(IReadModelContext context, + IDomainEvent domainEvent, CancellationToken cancellationToken) { var messageItem = domainEvent.AggregateEvent.OutboxMessageItem; @@ -216,10 +217,19 @@ public Task ApplyAsync(IReadModelContext context, ExpirationTime = messageItem.Date + messageItem.TtlPeriod.Value; } - InvertMedia = messageItem.InvertMedia; - - return Task.CompletedTask; - } + InvertMedia = messageItem.InvertMedia; + + return Task.CompletedTask; + } + + public Task ApplyAsync(IReadModelContext context, + IDomainEvent domainEvent, + CancellationToken cancellationToken) + { + Views = domainEvent.AggregateEvent.Views; + + return Task.CompletedTask; + } public Task ApplyAsync(IReadModelContext context, IDomainEvent domainEvent, From 52b9ab27abc6bfa808ef2f6bb46ac2637402c2c5 Mon Sep 17 00:00:00 2001 From: Ali Khadivi Date: Tue, 7 Oct 2025 08:32:07 +0330 Subject: [PATCH 2/6] Fixed Mentions and Formated MessageAppService --- .../MyMessengerJsonContext.g.cs | 1 + .../Aggregates/Dialog/DialogAggregate.cs | 4 +- .../Dialog/ReadMentionCommandHandler.cs | 2 +- .../Commands/Dialog/ReadMentionCommand.cs | 9 +- .../Events/Dialog/MentionReadEvent.cs | 9 +- .../Sagas/Identities/ReadMentionsSagaId.cs | 4 + .../Identities/ReadMentionsSagaLocator.cs | 9 + .../Sagas/ReadMentionsSaga.cs | 50 +++ .../DomainEventHandlers/PtsEventHandler.cs | 10 + .../DomainEventHandlers/PtsEventHandler.cs | 30 +- .../ReadMentionsDomainEventHandler.cs | 29 ++ .../Messages/ReadMentionsHandler.cs | 3 +- .../NativeAot/MyMessengerJsonContext.g.cs | 7 +- .../NativeAot/MySagaAggregateStore.cs | 17 +- .../Impl/ChannelMessageViewsAppService.cs | 9 +- .../Services/Impl/MessageAppService.cs | 292 ++++++------------ .../IChannelMessageViewsAppService.cs | 2 - 17 files changed, 253 insertions(+), 234 deletions(-) create mode 100644 source/src/MyTelegram.Domain/Sagas/Identities/ReadMentionsSagaId.cs create mode 100644 source/src/MyTelegram.Domain/Sagas/Identities/ReadMentionsSagaLocator.cs create mode 100644 source/src/MyTelegram.Domain/Sagas/ReadMentionsSaga.cs create mode 100644 source/src/MyTelegram.Messenger.QueryServer/DomainEventHandlers/ReadMentionsDomainEventHandler.cs diff --git a/source/src/MyTelegram.DataSeeder/MyMessengerJsonContext.g.cs b/source/src/MyTelegram.DataSeeder/MyMessengerJsonContext.g.cs index 21de9e5ca..c60671a88 100644 --- a/source/src/MyTelegram.DataSeeder/MyMessengerJsonContext.g.cs +++ b/source/src/MyTelegram.DataSeeder/MyMessengerJsonContext.g.cs @@ -260,6 +260,7 @@ namespace MyTelegram.Messenger.NativeAot; [JsonSerializable(typeof(MyTelegram.Domain.Sagas.PostChannelIdUpdatedSagaEvent))] [JsonSerializable(typeof(MyTelegram.Domain.Sagas.ReadHistoryPtsIncrementedSagaEvent))] [JsonSerializable(typeof(MyTelegram.Domain.Sagas.ReadHistoryStartedSagaEvent))] +[JsonSerializable(typeof(MyTelegram.Domain.Sagas.ReadMentionsCompletedSagaEvent))] [JsonSerializable(typeof(MyTelegram.Domain.Sagas.ReplyBroadcastChannelCompletedSagaEvent))] [JsonSerializable(typeof(MyTelegram.Domain.Sagas.SetDiscussionGroupSagaStartedSagaEvent))] [JsonSerializable(typeof(MyTelegram.Domain.Sagas.UnpinAllMessagesCompletedSagaEvent))] diff --git a/source/src/MyTelegram.Domain/Aggregates/Dialog/DialogAggregate.cs b/source/src/MyTelegram.Domain/Aggregates/Dialog/DialogAggregate.cs index 6d0dec9c1..bf38af3c4 100644 --- a/source/src/MyTelegram.Domain/Aggregates/Dialog/DialogAggregate.cs +++ b/source/src/MyTelegram.Domain/Aggregates/Dialog/DialogAggregate.cs @@ -156,7 +156,7 @@ int date toPeer, date)); } - public void ReadMention(int messageId, bool readAllMentions = false) + public void ReadMention(RequestInfo requestInfo, int messageId, bool readAllMentions = false) { Specs.AggregateIsCreated.ThrowDomainErrorIfNotSatisfied(this); var unreadMentionsCount = readAllMentions ? 0 : _state.UnreadMentionsCount - 1; @@ -165,7 +165,7 @@ public void ReadMention(int messageId, bool readAllMentions = false) unreadMentionsCount = 0; } - Emit(new MentionReadEvent(_state.OwnerId, _state.ToPeer, messageId, unreadMentionsCount)); + Emit(new MentionReadEvent(requestInfo, _state.OwnerId, _state.ToPeer, messageId, unreadMentionsCount)); } public void ReceiveInboxMessage( diff --git a/source/src/MyTelegram.Domain/CommandHandlers/Dialog/ReadMentionCommandHandler.cs b/source/src/MyTelegram.Domain/CommandHandlers/Dialog/ReadMentionCommandHandler.cs index 107cf368e..bcbb1b5f0 100644 --- a/source/src/MyTelegram.Domain/CommandHandlers/Dialog/ReadMentionCommandHandler.cs +++ b/source/src/MyTelegram.Domain/CommandHandlers/Dialog/ReadMentionCommandHandler.cs @@ -4,7 +4,7 @@ public class ReadMentionCommandHandler : CommandHandler(aggregateId) +public class ReadMentionCommand( + DialogId aggregateId, + RequestInfo requestInfo, + long ownerUserId, + int messageId, + bool readAllMentions = false) + : RequestCommand2(aggregateId, requestInfo) { public long OwnerUserId { get; } = ownerUserId; diff --git a/source/src/MyTelegram.Domain/Events/Dialog/MentionReadEvent.cs b/source/src/MyTelegram.Domain/Events/Dialog/MentionReadEvent.cs index 39f944ad6..998617842 100644 --- a/source/src/MyTelegram.Domain/Events/Dialog/MentionReadEvent.cs +++ b/source/src/MyTelegram.Domain/Events/Dialog/MentionReadEvent.cs @@ -1,7 +1,12 @@ namespace MyTelegram.Domain.Events.Dialog; -public class MentionReadEvent(long ownerUserId, Peer toPeer, int messageId, int unreadMentionsCount) - : AggregateEvent +public class MentionReadEvent( + RequestInfo requestInfo, + long ownerUserId, + Peer toPeer, + int messageId, + int unreadMentionsCount) + : RequestAggregateEvent2(requestInfo) { public long OwnerUserId { get; } = ownerUserId; public Peer ToPeer { get; } = toPeer; diff --git a/source/src/MyTelegram.Domain/Sagas/Identities/ReadMentionsSagaId.cs b/source/src/MyTelegram.Domain/Sagas/Identities/ReadMentionsSagaId.cs new file mode 100644 index 000000000..d940aea7a --- /dev/null +++ b/source/src/MyTelegram.Domain/Sagas/Identities/ReadMentionsSagaId.cs @@ -0,0 +1,4 @@ +namespace MyTelegram.Domain.Sagas.Identities; + +[JsonConverter(typeof(SystemTextJsonSingleValueObjectConverter))] +public class ReadMentionsSagaId(string value) : SingleValueObject(value), ISagaId; diff --git a/source/src/MyTelegram.Domain/Sagas/Identities/ReadMentionsSagaLocator.cs b/source/src/MyTelegram.Domain/Sagas/Identities/ReadMentionsSagaLocator.cs new file mode 100644 index 000000000..fea550209 --- /dev/null +++ b/source/src/MyTelegram.Domain/Sagas/Identities/ReadMentionsSagaLocator.cs @@ -0,0 +1,9 @@ +namespace MyTelegram.Domain.Sagas.Identities; + +public class ReadMentionsSagaLocator : DefaultSagaLocator +{ + protected override ReadMentionsSagaId CreateSagaId(string requestId) + { + return new ReadMentionsSagaId(requestId); + } +} diff --git a/source/src/MyTelegram.Domain/Sagas/ReadMentionsSaga.cs b/source/src/MyTelegram.Domain/Sagas/ReadMentionsSaga.cs new file mode 100644 index 000000000..a765e8e7e --- /dev/null +++ b/source/src/MyTelegram.Domain/Sagas/ReadMentionsSaga.cs @@ -0,0 +1,50 @@ +namespace MyTelegram.Domain.Sagas; + +public class ReadMentionsCompletedSagaEvent( + RequestInfo requestInfo, + long userId, + Peer toPeer, + int messageId, + int unreadMentionsCount, + int pts) + : RequestAggregateEvent2(requestInfo) +{ + public long UserId { get; } = userId; + public Peer ToPeer { get; } = toPeer; + public int MessageId { get; } = messageId; + public int UnreadMentionsCount { get; } = unreadMentionsCount; + public int Pts { get; } = pts; + public int PtsCount { get; } = 0; +} + +public class ReadMentionsSaga : MyInMemoryAggregateSaga, + ISagaIsStartedBy +{ + private readonly IIdGenerator _idGenerator; + + public ReadMentionsSaga(ReadMentionsSagaId id, IEventStore eventStore, IIdGenerator idGenerator) : base(id, eventStore) + { + _idGenerator = idGenerator; + } + + public async Task HandleAsync( + IDomainEvent domainEvent, + ISagaContext sagaContext, + CancellationToken cancellationToken) + { + var pts = await _idGenerator.NextIdAsync( + IdType.Pts, + domainEvent.AggregateEvent.OwnerUserId, + cancellationToken: cancellationToken); + + Emit(new ReadMentionsCompletedSagaEvent( + domainEvent.AggregateEvent.RequestInfo, + domainEvent.AggregateEvent.OwnerUserId, + domainEvent.AggregateEvent.ToPeer, + domainEvent.AggregateEvent.MessageId, + domainEvent.AggregateEvent.UnreadMentionsCount, + pts)); + + await CompleteAsync(cancellationToken); + } +} diff --git a/source/src/MyTelegram.Messenger.CommandServer/DomainEventHandlers/PtsEventHandler.cs b/source/src/MyTelegram.Messenger.CommandServer/DomainEventHandlers/PtsEventHandler.cs index 03ebd21f6..e060b7342 100644 --- a/source/src/MyTelegram.Messenger.CommandServer/DomainEventHandlers/PtsEventHandler.cs +++ b/source/src/MyTelegram.Messenger.CommandServer/DomainEventHandlers/PtsEventHandler.cs @@ -25,6 +25,7 @@ public class PtsEventHandler(IPtsHelper ptsHelper, ISubscribeSynchronousTo, ISubscribeSynchronousTo, ISubscribeSynchronousTo, + ISubscribeSynchronousTo, ISubscribeSynchronousTo, ISubscribeSynchronousTo, ISubscribeSynchronousTo @@ -134,6 +135,15 @@ public async Task HandleAsync(IDomainEvent domainEvent, + CancellationToken cancellationToken) + { + return UpdatePtsAsync(domainEvent.AggregateEvent.UserId, + domainEvent.AggregateEvent.Pts, + domainEvent.AggregateEvent.PtsCount); + } + private async Task IncrementGlobalSeqNoAsync(long userId) { var globalSeqNo = await idGenerator.NextLongIdAsync(IdType.GlobalSeqNo); diff --git a/source/src/MyTelegram.Messenger.QueryServer/DomainEventHandlers/PtsEventHandler.cs b/source/src/MyTelegram.Messenger.QueryServer/DomainEventHandlers/PtsEventHandler.cs index 3140b6bda..a99199473 100644 --- a/source/src/MyTelegram.Messenger.QueryServer/DomainEventHandlers/PtsEventHandler.cs +++ b/source/src/MyTelegram.Messenger.QueryServer/DomainEventHandlers/PtsEventHandler.cs @@ -22,7 +22,8 @@ public class PtsEventHandler( DeleteReplyMessagePtsIncrementedSagaEvent>, ISubscribeSynchronousTo, ISubscribeSynchronousTo, - ISubscribeSynchronousTo + ISubscribeSynchronousTo, + ISubscribeSynchronousTo { public async Task HandleAsync( IDomainEvent domainEvent, @@ -98,15 +99,24 @@ await ptsHelper.IncrementPtsAsync(domainEvent.AggregateEvent.PeerId, domainEvent newUnreadCount: -domainEvent.AggregateEvent.ReadCount); } - public async Task HandleAsync( - IDomainEvent domainEvent, - CancellationToken cancellationToken) - { - foreach (var item in domainEvent.AggregateEvent.MessageItems) - { - await ptsHelper.IncrementPtsAsync(item.OwnerPeer.PeerId, item.Pts); - } - } + public async Task HandleAsync( + IDomainEvent domainEvent, + CancellationToken cancellationToken) + { + foreach (var item in domainEvent.AggregateEvent.MessageItems) + { + await ptsHelper.IncrementPtsAsync(item.OwnerPeer.PeerId, item.Pts); + } + } + + public Task HandleAsync( + IDomainEvent domainEvent, + CancellationToken cancellationToken) + { + return ptsHelper.IncrementPtsAsync(domainEvent.AggregateEvent.UserId, + domainEvent.AggregateEvent.Pts, + domainEvent.AggregateEvent.PtsCount); + } public Task HandleAsync( IDomainEvent domainEvent, diff --git a/source/src/MyTelegram.Messenger.QueryServer/DomainEventHandlers/ReadMentionsDomainEventHandler.cs b/source/src/MyTelegram.Messenger.QueryServer/DomainEventHandlers/ReadMentionsDomainEventHandler.cs new file mode 100644 index 000000000..1ade91a72 --- /dev/null +++ b/source/src/MyTelegram.Messenger.QueryServer/DomainEventHandlers/ReadMentionsDomainEventHandler.cs @@ -0,0 +1,29 @@ +namespace MyTelegram.Messenger.QueryServer.DomainEventHandlers; + +public class ReadMentionsDomainEventHandler( + IObjectMessageSender objectMessageSender, + ICommandBus commandBus, + IIdGenerator idGenerator, + IAckCacheService ackCacheService, + IPushDataFactory pushDataFactory) + : DomainEventHandlerBase(objectMessageSender, commandBus, idGenerator, ackCacheService, pushDataFactory), + ISubscribeSynchronousTo +{ + public async Task HandleAsync( + IDomainEvent domainEvent, + CancellationToken cancellationToken) + { + var affectedHistory = new TAffectedHistory + { + Pts = domainEvent.AggregateEvent.Pts, + PtsCount = domainEvent.AggregateEvent.PtsCount, + Offset = 0 + }; + + await SendRpcMessageToClientAsync( + domainEvent.AggregateEvent.RequestInfo, + affectedHistory, + domainEvent.AggregateEvent.UserId, + domainEvent.AggregateEvent.Pts); + } +} diff --git a/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/ReadMentionsHandler.cs b/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/ReadMentionsHandler.cs index 46295ba60..5430c7063 100644 --- a/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/ReadMentionsHandler.cs +++ b/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/ReadMentionsHandler.cs @@ -30,8 +30,9 @@ internal sealed class ReadMentionsHandler( if (dialogReadModel != null && dialogReadModel.UnreadMentionsCount > 0) { var messageId = obj.TopMsgId ?? dialogReadModel.TopMessage; - var command = new ReadMentionCommand(dialogId, input.UserId, messageId, true); + var command = new ReadMentionCommand(dialogId, input.ToRequestInfo(), input.UserId, messageId, true); await commandBus.PublishAsync(command); + return null!; } return new TAffectedHistory diff --git a/source/src/MyTelegram.Messenger/NativeAot/MyMessengerJsonContext.g.cs b/source/src/MyTelegram.Messenger/NativeAot/MyMessengerJsonContext.g.cs index 21de9e5ca..d76ab6e23 100644 --- a/source/src/MyTelegram.Messenger/NativeAot/MyMessengerJsonContext.g.cs +++ b/source/src/MyTelegram.Messenger/NativeAot/MyMessengerJsonContext.g.cs @@ -258,9 +258,10 @@ namespace MyTelegram.Messenger.NativeAot; [JsonSerializable(typeof(MyTelegram.Domain.Sagas.MessageUnpinnedSagaEvent))] [JsonSerializable(typeof(MyTelegram.Domain.Sagas.PinChannelMessagePtsIncrementedSagaEvent))] [JsonSerializable(typeof(MyTelegram.Domain.Sagas.PostChannelIdUpdatedSagaEvent))] -[JsonSerializable(typeof(MyTelegram.Domain.Sagas.ReadHistoryPtsIncrementedSagaEvent))] -[JsonSerializable(typeof(MyTelegram.Domain.Sagas.ReadHistoryStartedSagaEvent))] -[JsonSerializable(typeof(MyTelegram.Domain.Sagas.ReplyBroadcastChannelCompletedSagaEvent))] +[JsonSerializable(typeof(MyTelegram.Domain.Sagas.ReadHistoryPtsIncrementedSagaEvent))] +[JsonSerializable(typeof(MyTelegram.Domain.Sagas.ReadHistoryStartedSagaEvent))] +[JsonSerializable(typeof(MyTelegram.Domain.Sagas.ReadMentionsCompletedSagaEvent))] +[JsonSerializable(typeof(MyTelegram.Domain.Sagas.ReplyBroadcastChannelCompletedSagaEvent))] [JsonSerializable(typeof(MyTelegram.Domain.Sagas.SetDiscussionGroupSagaStartedSagaEvent))] [JsonSerializable(typeof(MyTelegram.Domain.Sagas.UnpinAllMessagesCompletedSagaEvent))] [JsonSerializable(typeof(MyTelegram.Domain.Sagas.UnpinAllMessagesStartedSagaEvent))] diff --git a/source/src/MyTelegram.Messenger/NativeAot/MySagaAggregateStore.cs b/source/src/MyTelegram.Messenger/NativeAot/MySagaAggregateStore.cs index 386a024ed..53a0e7dda 100644 --- a/source/src/MyTelegram.Messenger/NativeAot/MySagaAggregateStore.cs +++ b/source/src/MyTelegram.Messenger/NativeAot/MySagaAggregateStore.cs @@ -137,13 +137,16 @@ private async Task> UpdateInternalAsync(ISagaI case ReadChannelHistorySagaId readChannelHistorySagaId: domainEvents = await aggregateStore.UpdateAsync(readChannelHistorySagaId, sourceId, updateSaga, cancellationToken); break; - case ReadHistorySagaId readHistorySagaId: - domainEvents = await aggregateStore.UpdateAsync(readHistorySagaId, sourceId, updateSaga, cancellationToken); - break; - - case SignInSagaId signInSagaId: - domainEvents = await aggregateStore.UpdateAsync(signInSagaId, sourceId, updateSaga, cancellationToken); - break; + case ReadHistorySagaId readHistorySagaId: + domainEvents = await aggregateStore.UpdateAsync(readHistorySagaId, sourceId, updateSaga, cancellationToken); + break; + case ReadMentionsSagaId readMentionsSagaId: + domainEvents = await aggregateStore.UpdateAsync(readMentionsSagaId, sourceId, updateSaga, cancellationToken); + break; + + case SignInSagaId signInSagaId: + domainEvents = await aggregateStore.UpdateAsync(signInSagaId, sourceId, updateSaga, cancellationToken); + break; case UpdateContactProfilePhotoSagaId updateContactProfilePhotoSagaId: domainEvents = await aggregateStore.UpdateAsync(updateContactProfilePhotoSagaId, sourceId, updateSaga, cancellationToken); break; diff --git a/source/src/MyTelegram.Messenger/Services/Impl/ChannelMessageViewsAppService.cs b/source/src/MyTelegram.Messenger/Services/Impl/ChannelMessageViewsAppService.cs index 3ae173c81..39cacc288 100644 --- a/source/src/MyTelegram.Messenger/Services/Impl/ChannelMessageViewsAppService.cs +++ b/source/src/MyTelegram.Messenger/Services/Impl/ChannelMessageViewsAppService.cs @@ -11,11 +11,10 @@ public class ChannelMessageViewsAppService( : IChannelMessageViewsAppService, ITransientDependency { public async Task IncrementViewsIfNotIncrementedAsync(long selfUserId, - long authKeyId, long channelId, int messageId) { - var key = GetFilterKey(selfUserId, authKeyId, channelId, messageId); + var key = GetFilterKey(selfUserId, channelId, messageId); var isExists = await cuckooFilter.ExistsAsync(key); if (!isExists) { @@ -24,20 +23,18 @@ public async Task IncrementViewsIfNotIncrementedAsync(long selfUserId, } private byte[] GetFilterKey(long selfUserId, - long authKeyId, long channelId, int messageId) => Encoding.UTF8.GetBytes( - $"{MyTelegramConsts.ChannelMessageViewsBloomFilterKey}_{selfUserId}_{authKeyId}_{channelId}_{messageId}"); + $"{MyTelegramConsts.ChannelMessageViewsBloomFilterKey}_{selfUserId}_{channelId}_{messageId}"); public async Task> GetMessageViewsAsync(long selfUserId, - long authKeyId, long channelId, List messageIdList) { var messageIdGreaterThanZeroList = messageIdList.Where(p => p > 0).ToList(); var keyList = messageIdGreaterThanZeroList - .Select(p => GetFilterKey(selfUserId, authKeyId, channelId, p)).ToList(); + .Select(p => GetFilterKey(selfUserId, channelId, p)).ToList(); var needIncrementMessageIdList = new List(); var index = 0; diff --git a/source/src/MyTelegram.Messenger/Services/Impl/MessageAppService.cs b/source/src/MyTelegram.Messenger/Services/Impl/MessageAppService.cs index 2d3adbfd7..18adca038 100644 --- a/source/src/MyTelegram.Messenger/Services/Impl/MessageAppService.cs +++ b/source/src/MyTelegram.Messenger/Services/Impl/MessageAppService.cs @@ -1,4 +1,4 @@ -using MyTelegram.Messenger.Services.Interfaces; +using TWebPage = MyTelegram.Schema.TWebPage; namespace MyTelegram.Messenger.Services.Impl; @@ -32,41 +32,41 @@ public class MessageAppService( // no trailing punctuation inside the entity; avoids picking up emails/usernames. private static readonly Regex UrlRegex = new( """ -(?xi) -(?\d{2,5}) )? # optional port - -(?: # --- optional path/query/frag --- - / # path starts - (?: - [^\s<>()\[\]{}"'`]+ - | \([^\s<>()\[\]{}"'`]*\) - )* -)? -(?= - \s | $ | [)\]\}.,!?;:] # stop before trailing punctuation/space/end -) -""", + (?xi) + (?\d{2,5}) )? # optional port + + (?: # --- optional path/query/frag --- + / # path starts + (?: + [^\s<>()\[\]{}"'`]+ + | \([^\s<>()\[\]{}"'`]*\) + )* + )? + (?= + \s | $ | [)\]\}.,!?;:] # stop before trailing punctuation/space/end + ) + """, RegexOptions.Compiled | RegexOptions.IgnoreCase | RegexOptions.CultureInvariant, RxTimeout); @@ -79,9 +79,7 @@ public class MessageAppService( public void CheckBotPermission(long requestUserId, Peer toPeer) { if (peerHelper.IsBotUser(requestUserId) && peerHelper.IsBotUser(toPeer.PeerId)) - { RpcErrors.RpcErrors400.UserIsBot.ThrowRpcError(); - } } public async Task CanSendAsPeerAsync(long channelId, long userId) @@ -94,20 +92,14 @@ public async Task CanSendAsPeerAsync(long channelId, long userId) { var channelAdmin = channelReadModel.AdminList.FirstOrDefault(p => p.UserId == userId); if (channelReadModel.CreatorId == userId || (channelAdmin?.AdminRights.PostMessages ?? false)) - { canSendAsPeer = true; - } } if (!canSendAsPeer) - { // Super group with linked channel/Public super group if (channelReadModel.MegaGroup && (!string.IsNullOrEmpty(channelReadModel.UserName) || channelReadModel.LinkedChatId != null)) - { canSendAsPeer = true; - } - } return canSendAsPeer; } @@ -116,28 +108,19 @@ public async Task IsValidSendAsPeerAsync(long requestUserId, Peer toPeer, { if (sendAsPeer != null) { - if (toPeer.PeerType != PeerType.Channel) - { - return false; - } + if (toPeer.PeerType != PeerType.Channel) return false; switch (sendAsPeer.PeerType) { case PeerType.User: case PeerType.Self: - if (sendAsPeer.PeerId != requestUserId) - { - return false; - } + if (sendAsPeer.PeerId != requestUserId) return false; break; case PeerType.Channel: var canSendAsPeer = await CanSendAsPeerAsync(toPeer.PeerId, requestUserId); - if (!canSendAsPeer) - { - return false; - } + if (!canSendAsPeer) return false; var sendAsChannelReadModel = await channelAppService.GetAsync(sendAsPeer.PeerId); @@ -147,9 +130,7 @@ public async Task IsValidSendAsPeerAsync(long requestUserId, Peer toPeer, (string.IsNullOrEmpty(sendAsChannelReadModel.UserName) && sendAsChannelReadModel.LinkedChatId != toPeer.PeerId && sendAsChannelReadModel.ChannelId != toPeer.PeerId)) - { return false; - } break; } @@ -161,10 +142,7 @@ public async Task IsValidSendAsPeerAsync(long requestUserId, Peer toPeer, public async Task CheckSendAsAsync(long requestUserId, Peer toPeer, Peer? sendAsPeer) { var isValid = await IsValidSendAsPeerAsync(requestUserId, toPeer, sendAsPeer); - if (!isValid) - { - RpcErrors.RpcErrors400.SendAsPeerInvalid.ThrowRpcError(); - } + if (!isValid) RpcErrors.RpcErrors400.SendAsPeerInvalid.ThrowRpcError(); } public async Task GetChannelDifferenceAsync(GetDifferenceInput input) @@ -219,12 +197,10 @@ public Task SearchGlobalAsync(SearchGlobalInput input) { return GetMessagesCoreAsync(input); } + public async Task SendMessageAsync(List inputs) { - if (inputs.Count == 0) - { - throw new ArgumentException(); - } + if (inputs.Count == 0) throw new ArgumentException(); List sendMessageItems = []; var firstInput = inputs.First(); @@ -263,31 +239,24 @@ public async Task SearchPostsAsync(long selfUserId, SearchPos new GetChannelMemberListByChannelIdListQuery(selfUserId, channelIds.ToList())); var photoReadModels = await photoAppService.GetPhotosAsync(channelReadModels); - return new SearchPostsResult(messageReadModels, channelReadModels, channelMemberReadModels, photoReadModels, userReadModels); + return new SearchPostsResult(messageReadModels, channelReadModels, channelMemberReadModels, photoReadModels, + userReadModels); } private async Task CheckChannelBannedRightsAsync(SendMessageInput input) { - if (input.ToPeer.PeerType != PeerType.Channel) - { - return null; - } + if (input.ToPeer.PeerType != PeerType.Channel) return null; var channelReadModel = await channelAppService.GetAsync(input.ToPeer.PeerId); if (channelReadModel!.Broadcast) { var admin = channelReadModel.AdminList.FirstOrDefault(p => p.UserId == input.SenderUserId); if (admin == null || !admin.AdminRights.PostMessages) - { RpcErrors.RpcErrors403.ChatWriteForbidden.ThrowRpcError(); - } } var bannedDefaultRights = channelReadModel.DefaultBannedRights ?? ChatBannedRights.CreateDefaultBannedRights(); - if (bannedDefaultRights.SendMessages) - { - RpcErrors.RpcErrors403.ChatWriteForbidden.ThrowRpcError(); - } + if (bannedDefaultRights.SendMessages) RpcErrors.RpcErrors403.ChatWriteForbidden.ThrowRpcError(); var channelMemberReadModel = await queryProcessor.ProcessAsync(new GetChannelMemberByUserIdQuery(channelReadModel.ChannelId, @@ -298,7 +267,6 @@ await queryProcessor.ProcessAsync(new GetChannelMemberByUserIdQuery(channelReadM { if (channelReadModel is { Broadcast: false, LinkedChatId: not null, JoinToSend: false }) { - } else { @@ -311,20 +279,12 @@ await queryProcessor.ProcessAsync(new GetChannelMemberByUserIdQuery(channelReadM var memberBannedRights = ChatBannedRights.FromValue(channelMemberReadModel.BannedRights, channelMemberReadModel.UntilDate); if (!string.IsNullOrEmpty(input.Message)) - { if (memberBannedRights.SendMessages) - { RpcErrors.RpcErrors400.UserBannedInChannel.ThrowRpcError(); - } - } if (input.Media != null) - { if (memberBannedRights.SendMedia) - { RpcErrors.RpcErrors400.UserBannedInChannel.ThrowRpcError(); - } - } } //if (channelReadModel.SlowModeEnabled) @@ -343,10 +303,7 @@ await queryProcessor.ProcessAsync(new GetChannelMemberByUserIdQuery(channelReadM // 3.If the client does not pass a value, the default SendAsPeer is not set, and in the discussion group, use discussion group as SendAsPeer if (input.SendAs != null) { - if (await IsValidSendAsPeerAsync(input.RequestInfo.UserId, input.ToPeer, input.SendAs)) - { - return input.SendAs; - } + if (await IsValidSendAsPeerAsync(input.RequestInfo.UserId, input.ToPeer, input.SendAs)) return input.SendAs; } else if (input.ToPeer.PeerType == PeerType.Channel) { @@ -355,36 +312,27 @@ await queryProcessor.ProcessAsync(new GetChannelMemberByUserIdQuery(channelReadM if (!await CanSendAsPeerAsync(input.ToPeer.PeerId, input.RequestInfo.UserId)) { var admin = channelReadModel.AdminList.FirstOrDefault(p => p.UserId == input.SenderUserId); - if (admin is { AdminRights.Anonymous: true }) - { - return channelReadModel.ChannelId.ToChannelPeer(); - } + if (admin is { AdminRights.Anonymous: true }) return channelReadModel.ChannelId.ToChannelPeer(); return null; } + Peer? sendAsPeer; var userConfigReadModel = await queryProcessor.ProcessAsync( new GetUserConfigByKeyQuery(input.RequestInfo.UserId, ((int)UserConfigType.SendAsPeer).ToString())); if (userConfigReadModel != null) - { if (long.TryParse(userConfigReadModel.Value, out var sendAsPeerId)) { sendAsPeer = peerHelper.GetPeer(sendAsPeerId); if (await IsValidSendAsPeerAsync(input.RequestInfo.UserId, input.ToPeer, sendAsPeer)) - { return sendAsPeer; - } } - } if (channelReadModel is { MegaGroup: true, LinkedChatId: not null }) { sendAsPeer = channelReadModel.ChannelId.ToChannelPeer(); - if (await IsValidSendAsPeerAsync(input.RequestInfo.UserId, input.ToPeer, sendAsPeer)) - { - return sendAsPeer; - } + if (await IsValidSendAsPeerAsync(input.RequestInfo.UserId, input.ToPeer, sendAsPeer)) return sendAsPeer; } } @@ -400,10 +348,7 @@ private async Task CreateSendMessageItemAsync(SendMessageInput var entities = input.Entities ?? []; var mentionedUserIds = await ProcessMessageEntitiesAsync(input.Message, entities, input.ToPeer); - if (entities.Count == 0) - { - entities = null; - } + if (entities.Count == 0) entities = null; var ownerPeerId = input.ToPeer.PeerType == PeerType.Channel ? input.ToPeer.PeerId : input.SenderUserId; var replyToMsgId = input.InputReplyTo.ToReplyToMsgId(); @@ -422,10 +367,7 @@ await queryProcessor.ProcessAsync(new GetReplyToMsgIdListQuery(input.ToPeer, inp string? postAuthor = null; var isPublicPost = channelReadModel is { Broadcast: true, UserName: not null }; int? views = null; - if (channelReadModel?.Broadcast ?? false) - { - views = 0; - } + if (channelReadModel?.Broadcast ?? false) views = 0; if (channelReadModel is { Signatures: true, Broadcast: true }) { if (sendAs?.PeerType == PeerType.Channel) @@ -439,10 +381,7 @@ await queryProcessor.ProcessAsync(new GetReplyToMsgIdListQuery(input.ToPeer, inp postAuthor = $"{userReadModel.FirstName} {userReadModel.LastName}"; } - if (sendAs == null && channelReadModel.SignatureProfiles) - { - sendAs = input.RequestInfo.UserId.ToUserPeer(); - } + if (sendAs == null && channelReadModel.SignatureProfiles) sendAs = input.RequestInfo.UserId.ToUserPeer(); } var scheduleDate = input.ScheduleDate; @@ -451,29 +390,20 @@ await queryProcessor.ProcessAsync(new GetReplyToMsgIdListQuery(input.ToPeer, inp // If the schedule_date is less than 20 seconds in the future, the message will be sent immediately, // generating a normal updateNewMessage/updateNewChannelMessage. if (scheduleDate.Value - CurrentDate < 20) - { scheduleDate = null; - } else - { idType = IdType.ScheduleMessageId; - } } var pts = 0; MessageReply? reply = null; - if (post && linkedChannelId.HasValue) - { - reply = new MessageReply(linkedChannelId, 0, 0, 0, []); - } + if (post && linkedChannelId.HasValue) reply = new MessageReply(linkedChannelId, 0, 0, 0, []); var messageId = await idGenerator.NextIdAsync(idType, ownerPeerId); //var messageId = 0; int? scheduleMessageId = null; if (idType == IdType.ScheduleMessageId) - { scheduleMessageId = await idGenerator.NextIdAsync(IdType.ScheduleMessageId, ownerPeerId); - } var date = CurrentDate; var hashtags = GetHashtags(input.Message); @@ -526,26 +456,17 @@ await queryProcessor.ProcessAsync(new GetReplyToMsgIdListQuery(input.ToPeer, inp public List GetHashtags(string? message) { - if (string.IsNullOrEmpty(message)) - { - return []; - } + if (string.IsNullOrEmpty(message)) return []; var matches = Regex.Matches(message, HashtagPattern); var hashtags = new List(); const int maxHashtags = 10; foreach (Match match in matches) { - if (hashtags.Count > maxHashtags) - { - break; - } + if (hashtags.Count > maxHashtags) break; var hashtag = match.Groups[1].Value; - if (!hashtags.Contains(hashtag)) - { - hashtags.Add(hashtag); - } + if (!hashtags.Contains(hashtag)) hashtags.Add(hashtag); } return hashtags; @@ -564,9 +485,7 @@ private async Task CheckGlobalPrivacySettingsAsync(SendMessageInput input) var contactType = await contactAppService.GetContactTypeAsync(input.RequestInfo.UserId, input.ToPeer.PeerId); if (contactType != ContactType.Mutual && contactType != ContactType.ContactOfTargetUser) - { RpcErrors.RpcErrors406.PrivacyPremiumRequired.ThrowRpcError(); - } } } } @@ -592,7 +511,6 @@ private void ProcessMessageEntityHashtag(string message, IList? { var hashtagMatches = Regex.Matches(message, HashtagPattern); foreach (Match match in hashtagMatches) - { if (match.Success) { var entity = new TMessageEntityHashtag @@ -603,7 +521,6 @@ private void ProcessMessageEntityHashtag(string message, IList? entities ??= []; entities.Add(entity); } - } } private Task GetMessagesCoreAsync(TRequest input) @@ -621,8 +538,8 @@ private async Task GetMessagesInternalAsync(GetMessagesQuery q IReadOnlyCollection? chats = null) { var messageList = await queryProcessor.ProcessAsync(query); - HashSet userIds = users?.ToHashSet() ?? []; - HashSet channelIds = chats?.ToHashSet() ?? []; + var userIds = users?.ToHashSet() ?? []; + var channelIds = chats?.ToHashSet() ?? []; userIds.Add(query.SelfUserId); AddExtraPeerIds(messageList, userIds, channelIds); @@ -649,25 +566,18 @@ private async Task GetMessagesInternalAsync(GetMessagesQuery q IReadOnlyCollection joinedChannelIdList = new List(); if (channelIds.Count > 0) - { joinedChannelIdList = await queryProcessor .ProcessAsync(new GetJoinedChannelIdListQuery(query.SelfUserId, [.. channelIds])); - } var privacyList = await privacyAppService.GetPrivacyListAsync(userIdList); IReadOnlyCollection channelMemberList = new List(); if (joinedChannelIdList.Count > 0) - { channelMemberList = await queryProcessor .ProcessAsync( new GetChannelMemberListByChannelIdListQuery(query.SelfUserId, joinedChannelIdList.ToList())); - } var pts = query.Pts; - if (pts == 0 && messageList.Count > 0) - { - pts = messageList.Max(p => p.Pts); - } + if (pts == 0 && messageList.Count > 0) pts = messageList.Max(p => p.Pts); var pollIdList = messageList.Where(p => p.PollId.HasValue).Select(p => p.PollId!.Value).ToList(); IReadOnlyCollection? pollReadModels = null; @@ -755,10 +665,7 @@ void AddPeerIdIfNeeded(Peer? peer) switch (messageReadModel.MessageAction) { case TMessageActionChatAddUser messageActionChatAddUser: - foreach (var userId in messageActionChatAddUser.Users) - { - userIds.Add(userId); - } + foreach (var userId in messageActionChatAddUser.Users) userIds.Add(userId); break; case TMessageActionChatJoinedByLink messageActionChatJoinedByLink: @@ -774,7 +681,6 @@ void AddPeerIdIfNeeded(Peer? peer) break; } } - } // Creates URL entities, respecting max length and "one entity per character" rule. @@ -816,40 +722,47 @@ private static (int start, int length) TrimTrailingPunctuationAndBalance(string while (length > 0) { - char ch = s[start + length - 1]; + var ch = s[start + length - 1]; if (")]},.!?;:".IndexOf(ch) >= 0) length--; else break; } // Balance trailing ')' if they exceed '(' in the captured piece int open = 0, close = 0; - for (int i = 0; i < length; i++) + for (var i = 0; i < length; i++) { - char c = s[start + i]; + var c = s[start + i]; if (c == '(') open++; else if (c == ')') close++; } + while (length > 0 && close > open) - { - if (s[start + length - 1] == ')') { length--; close--; } - else break; - } + if (s[start + length - 1] == ')') + { + length--; + close--; + } + else + { + break; + } return (start, length); } private static bool AnyUsed(bool[] used, int start, int len) { - int end = Math.Min(used.Length, start + len); - for (int i = start; i < end; i++) - if (used[i]) return true; + var end = Math.Min(used.Length, start + len); + for (var i = start; i < end; i++) + if (used[i]) + return true; return false; } private static void MarkUsed(bool[] used, int start, int len) { - int end = Math.Min(used.Length, start + len); - for (int i = start; i < end; i++) + var end = Math.Min(used.Length, start + len); + for (var i = start; i < end; i++) used[i] = true; } @@ -871,9 +784,7 @@ private async Task> ProcessMessageEntityMentionAsyncSafe( var mentionEntities = new List(); if (entities is { Count: > 0 }) - { foreach (var e in entities) - { switch (e) { case TInputMessageEntityMentionName named: @@ -890,25 +801,6 @@ private async Task> ProcessMessageEntityMentionAsyncSafe( mentionedUserIds.Add(mn.UserId); break; } - } - } - - //// Text-based mentions using safe regex + overlap guard - //foreach (Match mm in MentionRegex.Matches(message)) - //{ - // var start = mm.Index; - // var length = mm.Length; - - // if (AnyUsed(used, start, length)) - // continue; // skip if inside URL/email (or already an entity) - - // var uname = mm.Groups[1].Value; // without '@' - // // mark the range as used so nothing overlaps - // MarkUsed(used, start, length); - - // candidateUsernames.Add(uname); - // mentionEntities.Add(new TMessageEntityMention { Offset = start, Length = length }); - //} foreach (var (start, length, uname) in usernameHelper.FindMentions(message)) { @@ -981,7 +873,7 @@ private async Task> ProcessMessageEntityMentionAsyncSafe( var baseJoin = joinChatDomain.TrimEnd('/'); return new TMessageMediaWebPage { - Webpage = new Schema.TWebPage + Webpage = new TWebPage { Id = Random.Shared.NextInt64(), Url = $"{baseJoin}/+{link}", @@ -989,7 +881,7 @@ private async Task> ProcessMessageEntityMentionAsyncSafe( Type = channel.Broadcast ? "telegram_channel" : "telegram_megagroup", SiteName = "MyTelegram", Title = channel.Title, - Description = "Join this group on MyTelegram.", + Description = "Join this group on MyTelegram." } }; } @@ -1019,18 +911,22 @@ private Regex BuildInviteRegex(string joinDomain) path = slash >= 0 ? normalized[(slash + 1)..] : ""; } } - catch { host = normalized; } + catch + { + host = normalized; + } var hostRx = Regex.Escape(host); var pathRx = string.IsNullOrEmpty(path) ? "" : $"{Regex.Escape(path)}/"; var pat = $$""" - (?xi) - \b - (?:https?://)? (?:www\.)? {{hostRx}} / {{pathRx}} \+ (?[A-Za-z0-9_-]{16,64}) - (?= \s | $ | [)\]\}.,!?;:] ) - """; - - return new Regex(pat, RegexOptions.Compiled | RegexOptions.IgnoreCase | RegexOptions.CultureInvariant, RxTimeout); + (?xi) + \b + (?:https?://)? (?:www\.)? {{hostRx}} / {{pathRx}} \+ (?[A-Za-z0-9_-]{16,64}) + (?= \s | $ | [)\]\}.,!?;:] ) + """; + + return new Regex(pat, RegexOptions.Compiled | RegexOptions.IgnoreCase | RegexOptions.CultureInvariant, + RxTimeout); } } \ No newline at end of file diff --git a/source/src/MyTelegram.Messenger/Services/Interfaces/IChannelMessageViewsAppService.cs b/source/src/MyTelegram.Messenger/Services/Interfaces/IChannelMessageViewsAppService.cs index 34aec44c5..772f88675 100644 --- a/source/src/MyTelegram.Messenger/Services/Interfaces/IChannelMessageViewsAppService.cs +++ b/source/src/MyTelegram.Messenger/Services/Interfaces/IChannelMessageViewsAppService.cs @@ -5,11 +5,9 @@ namespace MyTelegram.Messenger.Services.Interfaces; public interface IChannelMessageViewsAppService { Task IncrementViewsIfNotIncrementedAsync(long selfUserId, - long authKeyId, long channelId, int messageId); Task> GetMessageViewsAsync(long selfUserId, - long authKeyId, long channelId, List messageIdList); } \ No newline at end of file From 36dc308efae4a332d64b166f2f890571b09268fd Mon Sep 17 00:00:00 2001 From: Ali Khadivi Date: Tue, 7 Oct 2025 08:37:22 +0330 Subject: [PATCH 3/6] Fixed views --- .../ChannelMessageViewsDomainEventHandler.cs | 1 - .../Handlers/LatestLayer/Messages/GetMessagesViewsHandler.cs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/source/src/MyTelegram.Messenger.QueryServer/DomainEventHandlers/ChannelMessageViewsDomainEventHandler.cs b/source/src/MyTelegram.Messenger.QueryServer/DomainEventHandlers/ChannelMessageViewsDomainEventHandler.cs index 737832eba..19faddd04 100644 --- a/source/src/MyTelegram.Messenger.QueryServer/DomainEventHandlers/ChannelMessageViewsDomainEventHandler.cs +++ b/source/src/MyTelegram.Messenger.QueryServer/DomainEventHandlers/ChannelMessageViewsDomainEventHandler.cs @@ -13,7 +13,6 @@ public async Task HandleAsync(IDomainEvent Date: Tue, 7 Oct 2025 22:35:57 +0330 Subject: [PATCH 4/6] Fixed views - step 2 --- .../MyTelegram.Caching.Redis/IRedisHelper.cs | 11 ++ .../MyTelegramCachingExtensions.cs | 36 ++-- .../MyTelegram.Caching.Redis/RedisHelper.cs | 48 ++++++ .../MyTelegramConsts.cs | 1 + .../Messages/GetMessagesViewsHandler.cs | 67 +++----- .../Impl/ChannelMessageViewsAppService.cs | 154 +++++++++++------- .../IChannelMessageViewsAppService.cs | 2 +- 7 files changed, 204 insertions(+), 115 deletions(-) create mode 100644 source/src/MyTelegram.Caching.Redis/IRedisHelper.cs create mode 100644 source/src/MyTelegram.Caching.Redis/RedisHelper.cs diff --git a/source/src/MyTelegram.Caching.Redis/IRedisHelper.cs b/source/src/MyTelegram.Caching.Redis/IRedisHelper.cs new file mode 100644 index 000000000..5189e747f --- /dev/null +++ b/source/src/MyTelegram.Caching.Redis/IRedisHelper.cs @@ -0,0 +1,11 @@ +namespace MyTelegram.Caching.Redis +{ + public interface IRedisHelper + { + Task SetIfNotExistsAsync(string key, byte[] value, TimeSpan? expiry = null); + Task GetAsync(string key); + Task DeleteAsync(string key); + Task ExistsAsync(string key); + Task IncrementAsync(string key, long value = 1); + } +} diff --git a/source/src/MyTelegram.Caching.Redis/MyTelegramCachingExtensions.cs b/source/src/MyTelegram.Caching.Redis/MyTelegramCachingExtensions.cs index 3d4927caf..c644641a8 100644 --- a/source/src/MyTelegram.Caching.Redis/MyTelegramCachingExtensions.cs +++ b/source/src/MyTelegram.Caching.Redis/MyTelegramCachingExtensions.cs @@ -1,22 +1,38 @@ -using Microsoft.Extensions.Caching.Distributed; -using MyTelegram.Core; -using System.Text.Json; -using System.Text; +using System.Text.Json; using Microsoft.Extensions.Caching.StackExchangeRedis; using Microsoft.Extensions.DependencyInjection; +using MyTelegram.Core; +using StackExchange.Redis; namespace MyTelegram.Caching.Redis; public static class MyTelegramCachingExtensions { - public static IServiceCollection AddMyTelegramStackExchangeRedisCache(this IServiceCollection services, Action? options = null) + public static IServiceCollection AddMyTelegramStackExchangeRedisCache( + this IServiceCollection services, + Action? configureCache = null) { - //services.AddTransient(); - services.AddSingleton(typeof(ICacheManager<>), typeof(CacheManager<>)); - services.AddStackExchangeRedisCache(redisOptions => + // Register distributed cache (high-level) + services.AddStackExchangeRedisCache(options => { configureCache?.Invoke(options); }); + + // Register the low-level Redis multiplexer (for atomic ops) + services.AddSingleton(sp => { - options?.Invoke(redisOptions); + var opts = new RedisCacheOptions(); + configureCache?.Invoke(opts); + + if (string.IsNullOrWhiteSpace(opts.Configuration)) + throw new InvalidOperationException("Redis connection string not set!"); + + var cfg = ConfigurationOptions.Parse(opts.Configuration); + + return ConnectionMultiplexer.Connect(cfg); }); + + // Register cache manager + services.AddSingleton(typeof(ICacheManager<>), typeof(CacheManager<>)); + services.AddSingleton(); + return services; } @@ -26,7 +42,7 @@ public static IServiceCollection AddCacheJsonSerializer(this IServiceCollection var options = new JsonSerializerOptions(JsonSerializerOptions.Default); var serializer = new CacheSerializer(options); services.AddTransient(_ => serializer); - + configure?.Invoke(options); return services; diff --git a/source/src/MyTelegram.Caching.Redis/RedisHelper.cs b/source/src/MyTelegram.Caching.Redis/RedisHelper.cs new file mode 100644 index 000000000..304ce10fd --- /dev/null +++ b/source/src/MyTelegram.Caching.Redis/RedisHelper.cs @@ -0,0 +1,48 @@ +using StackExchange.Redis; + +namespace MyTelegram.Caching.Redis +{ + public class RedisHelper : IRedisHelper + { + private readonly IDatabase _db; + + public RedisHelper(IConnectionMultiplexer multiplexer) + { + _db = multiplexer.GetDatabase(); + } + + /// + /// Atomically sets the key only if it does not already exist (SET NX) + /// + public async Task SetIfNotExistsAsync(string key, byte[] value, TimeSpan? expiry = null) + { + return await _db.StringSetAsync( + key: key, + value: value, + expiry: expiry, + when: When.NotExists // NX + ).ConfigureAwait(false); + } + + public async Task GetAsync(string key) + { + var val = await _db.StringGetAsync(key).ConfigureAwait(false); + return val.HasValue ? (byte[]?)val! : null; + } + + public async Task DeleteAsync(string key) + { + return await _db.KeyDeleteAsync(key).ConfigureAwait(false); + } + + public async Task ExistsAsync(string key) + { + return await _db.KeyExistsAsync(key).ConfigureAwait(false); + } + + public async Task IncrementAsync(string key, long value = 1) + { + return await _db.StringIncrementAsync(key, value).ConfigureAwait(false); + } + } +} diff --git a/source/src/MyTelegram.Domain.Shared/MyTelegramConsts.cs b/source/src/MyTelegram.Domain.Shared/MyTelegramConsts.cs index 9553ee912..fc2bc4c9f 100644 --- a/source/src/MyTelegram.Domain.Shared/MyTelegramConsts.cs +++ b/source/src/MyTelegram.Domain.Shared/MyTelegramConsts.cs @@ -59,6 +59,7 @@ public class MyTelegramConsts public const string BlockedCuckooFilterKey = "MyTelegarm.CuckooFilter.Blocked"; public const string UserNameCuckooFilterKey = "MyTelegarm.CuckooFilter.UserName"; public const string ChannelMessageViewsBloomFilterKey = "MyTelegarm.BloomFilter.ChannelMessageViews"; + public const int ChannelMessageViewsTtl = 365; public static int EditTimeLimit { get; set; } = 172800; public const int QuickRepliesLimit = 100; diff --git a/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/GetMessagesViewsHandler.cs b/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/GetMessagesViewsHandler.cs index d745bdaac..782b531bc 100644 --- a/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/GetMessagesViewsHandler.cs +++ b/source/src/MyTelegram.Messenger/Handlers/LatestLayer/Messages/GetMessagesViewsHandler.cs @@ -1,4 +1,7 @@ -namespace MyTelegram.Messenger.Handlers.LatestLayer.Messages; +using IMessageViews = MyTelegram.Schema.Messages.IMessageViews; +using TMessageViews = MyTelegram.Schema.Messages.TMessageViews; + +namespace MyTelegram.Messenger.Handlers.LatestLayer.Messages; /// /// Get and increase the view counter of a message sent or forwarded from a channel @@ -11,56 +14,33 @@ /// 400 PEER_ID_INVALID The provided peer id is invalid. /// See /// -internal sealed class GetMessagesViewsHandler : RpcResultObjectHandler +internal sealed class GetMessagesViewsHandler( + IPeerHelper peerHelper, + IQueryProcessor queryProcessor, + IChannelMessageViewsAppService channelMessageViewsAppService, + IAccessHashHelper accessHashHelper) : RpcResultObjectHandler { - private readonly IAccessHashHelper _accessHashHelper; - private readonly IChannelMessageViewsAppService _channelMessageViewsAppService; - private readonly IPeerHelper _peerHelper; - private readonly IQueryProcessor _queryProcessor; - - public GetMessagesViewsHandler( - IPeerHelper peerHelper, - IQueryProcessor queryProcessor, - IChannelMessageViewsAppService channelMessageViewsAppService, - IAccessHashHelper accessHashHelper) - { - _peerHelper = peerHelper; - _queryProcessor = queryProcessor; - _channelMessageViewsAppService = channelMessageViewsAppService; - _accessHashHelper = accessHashHelper; - } - - protected override async Task HandleCoreAsync(IRequestInput input, + protected override async Task HandleCoreAsync(IRequestInput input, RequestGetMessagesViews obj) { - await _accessHashHelper.CheckAccessHashAsync(input, obj.Peer); - // todo:increment==false,only execute query - var peer = _peerHelper.GetPeer(obj.Peer, input.UserId); + await accessHashHelper.CheckAccessHashAsync(input, obj.Peer); + + var peer = peerHelper.GetPeer(obj.Peer, input.UserId); if (peer.PeerType == PeerType.Channel) { - if (obj.Id.Max() < 0) - { - return new MyTelegram.Schema.Messages.TMessageViews + if (obj.Id.Max() <= 0) + return new TMessageViews { Views = new TVector(obj.Id.Select(p => new Schema.TMessageViews { Views = 1 }) .ToList()), Chats = new TVector(), Users = new TVector() }; - } - //var id = obj.Id.First(p => p > 0); - //var command = new StartIncrementViewsCommand( - // MessageBoxId.Create(peer.PeerId, id), - // input.ReqMsgId, - // input.UserId, - // obj.Id, Guid.NewGuid()); - //await _commandBus.PublishAsync(command, CancellationToken.None); - //return null!; - var views = await _channelMessageViewsAppService - .GetMessageViewsAsync(input.UserId, peer.PeerId, obj.Id.ToList()) - ; - return new MyTelegram.Schema.Messages.TMessageViews + var views = await channelMessageViewsAppService + .GetMessageViewsAsync(input.UserId, peer.PeerId, obj.Id.ToList(), obj.Increment); + + return new TMessageViews { Chats = new TVector(), Users = new TVector(), @@ -69,10 +49,10 @@ public GetMessagesViewsHandler( } var boxIdList = obj.Id.Select(p => MessageId.Create(input.UserId, p).Value).ToList(); - var messages = await _queryProcessor - .ProcessAsync(new GetMessagesByIdListQuery(boxIdList), default); + var messages = await queryProcessor + .ProcessAsync(new GetMessagesByIdListQuery(boxIdList), CancellationToken.None); var dict = messages.ToDictionary(k => k.MessageId, v => v); - return new MyTelegram.Schema.Messages.TMessageViews + return new TMessageViews { Chats = new TVector(), Users = new TVector(), @@ -91,6 +71,5 @@ public GetMessagesViewsHandler( }; })) }; - throw new NotImplementedException(); } -} +} \ No newline at end of file diff --git a/source/src/MyTelegram.Messenger/Services/Impl/ChannelMessageViewsAppService.cs b/source/src/MyTelegram.Messenger/Services/Impl/ChannelMessageViewsAppService.cs index 39cacc288..db6af42b3 100644 --- a/source/src/MyTelegram.Messenger/Services/Impl/ChannelMessageViewsAppService.cs +++ b/source/src/MyTelegram.Messenger/Services/Impl/ChannelMessageViewsAppService.cs @@ -1,96 +1,107 @@ using EventFlow.Exceptions; +using MyTelegram.Caching.Redis; using MyTelegram.Messenger.Services.Filters; using IMessageViews = MyTelegram.Schema.IMessageViews; +using TMessageViews = MyTelegram.Schema.TMessageViews; namespace MyTelegram.Messenger.Services.Impl; public class ChannelMessageViewsAppService( IQueryProcessor queryProcessor, ICommandBus commandBus, - ICuckooFilter cuckooFilter) + ICuckooFilter cuckooFilter, + IRedisHelper redisHelper) : IChannelMessageViewsAppService, ITransientDependency { public async Task IncrementViewsIfNotIncrementedAsync(long selfUserId, long channelId, int messageId) { - var key = GetFilterKey(selfUserId, channelId, messageId); - var isExists = await cuckooFilter.ExistsAsync(key); - if (!isExists) - { - await cuckooFilter.AddAsync(key); - } + var keyInfo = CreateFilterKey(selfUserId, channelId, messageId); + await TryRegisterViewAsync(keyInfo.RedisKey, keyInfo.FilterKey); } - private byte[] GetFilterKey(long selfUserId, + private (string RedisKey, byte[] FilterKey) CreateFilterKey(long selfUserId, long channelId, - int messageId) => - Encoding.UTF8.GetBytes( - $"{MyTelegramConsts.ChannelMessageViewsBloomFilterKey}_{selfUserId}_{channelId}_{messageId}"); + int messageId) + { + var filterKey = + $"{MyTelegramConsts.ChannelMessageViewsBloomFilterKey}_{selfUserId}_{channelId}_{messageId}"; - public async Task> GetMessageViewsAsync(long selfUserId, + var redisKey = + $"view:{selfUserId}:{channelId}:{messageId}"; + + return (redisKey, Encoding.UTF8.GetBytes(filterKey)); + } + + public async Task> GetMessageViewsAsync( + long selfUserId, long channelId, - List messageIdList) + List messageIdList, + bool increment = false) { - var messageIdGreaterThanZeroList = messageIdList.Where(p => p > 0).ToList(); - var keyList = messageIdGreaterThanZeroList - .Select(p => GetFilterKey(selfUserId, channelId, p)).ToList(); + if (messageIdList.Count == 0) return new List(); - var needIncrementMessageIdList = new List(); - var index = 0; + var ids = messageIdList.Where(p => p > 0).ToList(); + if (ids.Count == 0) + return messageIdList.Select(_ => (IMessageViews)new TMessageViews { Views = 0 }).ToList(); - foreach (var key in keyList) + // 1) Dedup + collect to increment + HashSet needInc = []; + if (increment) { - var isExists = await cuckooFilter.ExistsAsync(key); - if (!isExists) - { - await cuckooFilter.AddAsync(key); - needIncrementMessageIdList.Add(messageIdGreaterThanZeroList[index]); - } - index++; - } + var pairs = ids.Select(m => (MessageId: m, Key: CreateFilterKey(selfUserId, channelId, m))).ToList(); - var messageViews = (await queryProcessor - .ProcessAsync(new GetMessageViewsQuery(channelId, messageIdGreaterThanZeroList), default) - .ConfigureAwait(false)) - .ToDictionary(k => k.MessageId, v => v) - ; - - foreach (var messageId in needIncrementMessageIdList) - { - try + // batch calls (parallel awaits instead of sequential) + var tasks = pairs.Select(async p => { - var command = new IncrementViewsCommand(MessageId.Create(channelId, messageId)); - await commandBus.PublishAsync(command); - } - catch (DomainError) - { - // - } + var firstTime = await TryRegisterViewAsync(p.Key.RedisKey, p.Key.FilterKey).ConfigureAwait(false); + if (firstTime) needInc.Add(p.MessageId); + }); + await Task.WhenAll(tasks).ConfigureAwait(false); } - var linkedChannelId = await queryProcessor.ProcessAsync(new GetLinkedChannelIdQuery(channelId)); + // 2) Read current views (one query) + var viewsDict = (await queryProcessor + .ProcessAsync(new GetMessageViewsQuery(channelId, ids), CancellationToken.None) + .ConfigureAwait(false)) + .ToDictionary(v => v.MessageId); - var replies = (await queryProcessor.ProcessAsync(new GetRepliesQuery(channelId, messageIdList))) - .ToDictionary(k => k.MessageId, v => v); + // 3) Increment for first-time viewers (robust, with optional rollback) + if (increment && needInc.Count > 0) + foreach (var mid in needInc) + try + { + await commandBus.PublishAsync(new IncrementViewsCommand(MessageId.Create(channelId, mid))) + .ConfigureAwait(false); + } + catch (DomainError) + { + } + + // 4) Replies & projection + var linkedChannelId = await queryProcessor + .ProcessAsync(new GetLinkedChannelIdQuery(channelId), CancellationToken.None) + .ConfigureAwait(false); + var repliesDict = (await queryProcessor + .ProcessAsync(new GetRepliesQuery(channelId, messageIdList), CancellationToken.None) + .ConfigureAwait(false)) + .ToDictionary(r => r.MessageId); - var messageViewsToClient = new List(); + var result = new List(messageIdList.Count); foreach (var messageId in messageIdList) { - var needIncrement = needIncrementMessageIdList.Contains(messageId); - if (messageViews.TryGetValue(messageId, out var views)) + var addOne = increment && needInc.Contains(messageId); + if (viewsDict.TryGetValue(messageId, out var views)) { - replies.TryGetValue(messageId, out var reply); - var recentRepliers = new List();// = reply?.RecentRepliers.Select(p => p.ToPeer()); - if (reply?.RecentRepliers?.Count > 0) - { - recentRepliers.AddRange(reply.RecentRepliers.Select(peer => peer.ToPeer())); - } + repliesDict.TryGetValue(messageId, out var reply); + var recentRepliers = reply?.RecentRepliers?.Count > 0 + ? reply.RecentRepliers.Select(p => p.ToPeer()).ToList() + : new List(); - messageViewsToClient.Add(new Schema.TMessageViews + result.Add(new TMessageViews { - Views = needIncrement ? views.Views + 1 : views.Views, - //Replies = new TMessageReplies { ChannelId = channelId } + Views = addOne ? views.Views + 1 : views.Views, Replies = new TMessageReplies { ChannelId = reply?.CommentChannelId ?? linkedChannelId, @@ -104,10 +115,33 @@ public async Task> GetMessageViewsAsync(long selfUserId, } else { - messageViewsToClient.Add(new Schema.TMessageViews { Views = 0 }); + result.Add(new TMessageViews { Views = addOne ? 1 : 0 }); } } - return messageViewsToClient; + return result; + } + + + private async Task TryRegisterViewAsync(string redisKey, byte[] filterKey) + { + // Local check: if Cuckoo filter already saw it, skip + if (await cuckooFilter.ExistsAsync(filterKey)) + return false; + + // Atomic Redis SET NX with TTL + var isNew = await redisHelper.SetIfNotExistsAsync(redisKey, filterKey, + TimeSpan.FromDays(MyTelegramConsts.ChannelMessageViewsTtl)); + + if (!isNew) + { + // Key already existed → record in Cuckoo for faster local lookup next time + await cuckooFilter.AddAsync(filterKey); + return false; + } + + // First time view → add to local Cuckoo + await cuckooFilter.AddAsync(filterKey); + return true; } } \ No newline at end of file diff --git a/source/src/MyTelegram.Messenger/Services/Interfaces/IChannelMessageViewsAppService.cs b/source/src/MyTelegram.Messenger/Services/Interfaces/IChannelMessageViewsAppService.cs index 772f88675..e029f59a3 100644 --- a/source/src/MyTelegram.Messenger/Services/Interfaces/IChannelMessageViewsAppService.cs +++ b/source/src/MyTelegram.Messenger/Services/Interfaces/IChannelMessageViewsAppService.cs @@ -9,5 +9,5 @@ Task IncrementViewsIfNotIncrementedAsync(long selfUserId, int messageId); Task> GetMessageViewsAsync(long selfUserId, long channelId, - List messageIdList); + List messageIdList, bool increment = false); } \ No newline at end of file From c94e2a0c0d55843483f928fe40a28bd2518ac3d9 Mon Sep 17 00:00:00 2001 From: Ali Khadivi Date: Thu, 30 Oct 2025 11:11:42 +0330 Subject: [PATCH 5/6] Rollback channel message views --- .../MyTelegramCachingExtensions.cs | 36 +++------ .../MyTelegramConsts.cs | 1 - .../Impl/ChannelMessageViewsAppService.cs | 80 +++++-------------- 3 files changed, 28 insertions(+), 89 deletions(-) diff --git a/source/src/MyTelegram.Caching.Redis/MyTelegramCachingExtensions.cs b/source/src/MyTelegram.Caching.Redis/MyTelegramCachingExtensions.cs index c644641a8..3d4927caf 100644 --- a/source/src/MyTelegram.Caching.Redis/MyTelegramCachingExtensions.cs +++ b/source/src/MyTelegram.Caching.Redis/MyTelegramCachingExtensions.cs @@ -1,38 +1,22 @@ -using System.Text.Json; +using Microsoft.Extensions.Caching.Distributed; +using MyTelegram.Core; +using System.Text.Json; +using System.Text; using Microsoft.Extensions.Caching.StackExchangeRedis; using Microsoft.Extensions.DependencyInjection; -using MyTelegram.Core; -using StackExchange.Redis; namespace MyTelegram.Caching.Redis; public static class MyTelegramCachingExtensions { - public static IServiceCollection AddMyTelegramStackExchangeRedisCache( - this IServiceCollection services, - Action? configureCache = null) + public static IServiceCollection AddMyTelegramStackExchangeRedisCache(this IServiceCollection services, Action? options = null) { - // Register distributed cache (high-level) - services.AddStackExchangeRedisCache(options => { configureCache?.Invoke(options); }); - - // Register the low-level Redis multiplexer (for atomic ops) - services.AddSingleton(sp => + //services.AddTransient(); + services.AddSingleton(typeof(ICacheManager<>), typeof(CacheManager<>)); + services.AddStackExchangeRedisCache(redisOptions => { - var opts = new RedisCacheOptions(); - configureCache?.Invoke(opts); - - if (string.IsNullOrWhiteSpace(opts.Configuration)) - throw new InvalidOperationException("Redis connection string not set!"); - - var cfg = ConfigurationOptions.Parse(opts.Configuration); - - return ConnectionMultiplexer.Connect(cfg); + options?.Invoke(redisOptions); }); - - // Register cache manager - services.AddSingleton(typeof(ICacheManager<>), typeof(CacheManager<>)); - services.AddSingleton(); - return services; } @@ -42,7 +26,7 @@ public static IServiceCollection AddCacheJsonSerializer(this IServiceCollection var options = new JsonSerializerOptions(JsonSerializerOptions.Default); var serializer = new CacheSerializer(options); services.AddTransient(_ => serializer); - + configure?.Invoke(options); return services; diff --git a/source/src/MyTelegram.Domain.Shared/MyTelegramConsts.cs b/source/src/MyTelegram.Domain.Shared/MyTelegramConsts.cs index fc2bc4c9f..9553ee912 100644 --- a/source/src/MyTelegram.Domain.Shared/MyTelegramConsts.cs +++ b/source/src/MyTelegram.Domain.Shared/MyTelegramConsts.cs @@ -59,7 +59,6 @@ public class MyTelegramConsts public const string BlockedCuckooFilterKey = "MyTelegarm.CuckooFilter.Blocked"; public const string UserNameCuckooFilterKey = "MyTelegarm.CuckooFilter.UserName"; public const string ChannelMessageViewsBloomFilterKey = "MyTelegarm.BloomFilter.ChannelMessageViews"; - public const int ChannelMessageViewsTtl = 365; public static int EditTimeLimit { get; set; } = 172800; public const int QuickRepliesLimit = 100; diff --git a/source/src/MyTelegram.Messenger/Services/Impl/ChannelMessageViewsAppService.cs b/source/src/MyTelegram.Messenger/Services/Impl/ChannelMessageViewsAppService.cs index 03330cc22..0c2e9cba7 100644 --- a/source/src/MyTelegram.Messenger/Services/Impl/ChannelMessageViewsAppService.cs +++ b/source/src/MyTelegram.Messenger/Services/Impl/ChannelMessageViewsAppService.cs @@ -1,7 +1,6 @@ using MyTelegram.Messenger.Services.Filters; using System.Buffers.Binary; using IMessageViews = MyTelegram.Schema.IMessageViews; -using TMessageViews = MyTelegram.Schema.TMessageViews; namespace MyTelegram.Messenger.Services.Impl; @@ -32,8 +31,7 @@ public static void GenerateFilterKey(long selfUserId, long channelId, int messag public async Task> GetMessageViewsAsync(long selfUserId, long authKeyId, long channelId, - List messageIdList, - bool increment = false) + List messageIdList) { var messageIdGreaterThanZeroList = messageIdList.Where(p => p > 0).ToList(); @@ -78,47 +76,28 @@ public async Task> GetMessageViewsAsync(long selfUserId, } } - // 2) Read current views (one query) - var viewsDict = (await queryProcessor - .ProcessAsync(new GetMessageViewsQuery(channelId, ids), CancellationToken.None) - .ConfigureAwait(false)) - .ToDictionary(v => v.MessageId); + var linkedChannelId = await queryProcessor.ProcessAsync(new GetLinkedChannelIdQuery(channelId)); - // 3) Increment for first-time viewers (robust, with optional rollback) - if (increment && needInc.Count > 0) - foreach (var mid in needInc) - try - { - await commandBus.PublishAsync(new IncrementViewsCommand(MessageId.Create(channelId, mid))) - .ConfigureAwait(false); - } - catch (DomainError) - { - } + var replies = (await queryProcessor.ProcessAsync(new GetRepliesQuery(channelId, messageIdList))) + .ToDictionary(k => k.MessageId, v => v); - // 4) Replies & projection - var linkedChannelId = await queryProcessor - .ProcessAsync(new GetLinkedChannelIdQuery(channelId), CancellationToken.None) - .ConfigureAwait(false); - var repliesDict = (await queryProcessor - .ProcessAsync(new GetRepliesQuery(channelId, messageIdList), CancellationToken.None) - .ConfigureAwait(false)) - .ToDictionary(r => r.MessageId); - - var result = new List(messageIdList.Count); + var messageViewsToClient = new List(); foreach (var messageId in messageIdList) { - var addOne = increment && needInc.Contains(messageId); - if (viewsDict.TryGetValue(messageId, out var views)) + var needIncrement = needIncrementMessageIdList.Contains(messageId); + if (messageViews.TryGetValue(messageId, out var views)) { - repliesDict.TryGetValue(messageId, out var reply); - var recentRepliers = reply?.RecentRepliers?.Count > 0 - ? reply.RecentRepliers.Select(p => p.ToPeer()).ToList() - : new List(); + replies.TryGetValue(messageId, out var reply); + var recentRepliers = new List();// = reply?.RecentRepliers.Select(p => p.ToPeer()); + if (reply?.RecentRepliers?.Count > 0) + { + recentRepliers.AddRange(reply.RecentRepliers.Select(peer => peer.ToPeer())); + } - result.Add(new TMessageViews + messageViewsToClient.Add(new Schema.TMessageViews { - Views = addOne ? views.Views + 1 : views.Views, + Views = needIncrement ? views.Views + 1 : views.Views, + //Replies = new TMessageReplies { ChannelId = channelId } Replies = new TMessageReplies { ChannelId = reply?.CommentChannelId ?? linkedChannelId, @@ -132,34 +111,11 @@ await commandBus.PublishAsync(new IncrementViewsCommand(MessageId.Create(channel } else { - result.Add(new TMessageViews { Views = addOne ? 1 : 0 }); + messageViewsToClient.Add(new Schema.TMessageViews { Views = 0 }); } } - return result; - } - - - private async Task TryRegisterViewAsync(string redisKey, byte[] filterKey) - { - // Local check: if Cuckoo filter already saw it, skip - if (await cuckooFilter.ExistsAsync(filterKey)) - return false; - - // Atomic Redis SET NX with TTL - var isNew = await redisHelper.SetIfNotExistsAsync(redisKey, filterKey, - TimeSpan.FromDays(MyTelegramConsts.ChannelMessageViewsTtl)); - - if (!isNew) - { - // Key already existed → record in Cuckoo for faster local lookup next time - await cuckooFilter.AddAsync(filterKey); - return false; - } - - // First time view → add to local Cuckoo - await cuckooFilter.AddAsync(filterKey); - return true; + return messageViewsToClient; } public void IncrementViews(long selfUserId, long channelId, int messageId) From 9c10319c292d94f9db2031eca23503dd68a09651 Mon Sep 17 00:00:00 2001 From: Ali Khadivi Date: Thu, 30 Oct 2025 11:14:00 +0330 Subject: [PATCH 6/6] Rollback channel message views 2 --- .../MyTelegram.Caching.Redis/IRedisHelper.cs | 11 ----- .../MyTelegram.Caching.Redis/RedisHelper.cs | 48 ------------------- 2 files changed, 59 deletions(-) delete mode 100644 source/src/MyTelegram.Caching.Redis/IRedisHelper.cs delete mode 100644 source/src/MyTelegram.Caching.Redis/RedisHelper.cs diff --git a/source/src/MyTelegram.Caching.Redis/IRedisHelper.cs b/source/src/MyTelegram.Caching.Redis/IRedisHelper.cs deleted file mode 100644 index 5189e747f..000000000 --- a/source/src/MyTelegram.Caching.Redis/IRedisHelper.cs +++ /dev/null @@ -1,11 +0,0 @@ -namespace MyTelegram.Caching.Redis -{ - public interface IRedisHelper - { - Task SetIfNotExistsAsync(string key, byte[] value, TimeSpan? expiry = null); - Task GetAsync(string key); - Task DeleteAsync(string key); - Task ExistsAsync(string key); - Task IncrementAsync(string key, long value = 1); - } -} diff --git a/source/src/MyTelegram.Caching.Redis/RedisHelper.cs b/source/src/MyTelegram.Caching.Redis/RedisHelper.cs deleted file mode 100644 index 304ce10fd..000000000 --- a/source/src/MyTelegram.Caching.Redis/RedisHelper.cs +++ /dev/null @@ -1,48 +0,0 @@ -using StackExchange.Redis; - -namespace MyTelegram.Caching.Redis -{ - public class RedisHelper : IRedisHelper - { - private readonly IDatabase _db; - - public RedisHelper(IConnectionMultiplexer multiplexer) - { - _db = multiplexer.GetDatabase(); - } - - /// - /// Atomically sets the key only if it does not already exist (SET NX) - /// - public async Task SetIfNotExistsAsync(string key, byte[] value, TimeSpan? expiry = null) - { - return await _db.StringSetAsync( - key: key, - value: value, - expiry: expiry, - when: When.NotExists // NX - ).ConfigureAwait(false); - } - - public async Task GetAsync(string key) - { - var val = await _db.StringGetAsync(key).ConfigureAwait(false); - return val.HasValue ? (byte[]?)val! : null; - } - - public async Task DeleteAsync(string key) - { - return await _db.KeyDeleteAsync(key).ConfigureAwait(false); - } - - public async Task ExistsAsync(string key) - { - return await _db.KeyExistsAsync(key).ConfigureAwait(false); - } - - public async Task IncrementAsync(string key, long value = 1) - { - return await _db.StringIncrementAsync(key, value).ConfigureAwait(false); - } - } -}