Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ int date
toPeer, date));
}

public void ReadMention(int messageId)
public void ReadMention(int messageId, bool readAllMentions = false)
{
Specs.AggregateIsCreated.ThrowDomainErrorIfNotSatisfied(this);
var unreadMentionsCount = _state.UnreadMentionsCount - 1;
var unreadMentionsCount = readAllMentions ? 0 : _state.UnreadMentionsCount - 1;
if (unreadMentionsCount < 0)
{
unreadMentionsCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ public class ReadMentionCommandHandler : CommandHandler<DialogAggregate, DialogI
{
public override Task ExecuteAsync(DialogAggregate aggregate, ReadMentionCommand command, CancellationToken cancellationToken)
{
aggregate.ReadMention(command.MessageId);
aggregate.ReadMention(command.MessageId, command.ReadAllMentions);

return Task.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
namespace MyTelegram.Domain.Commands.Dialog;

public class ReadMentionCommand(DialogId aggregateId, long ownerUserId, int messageId)
public class ReadMentionCommand(DialogId aggregateId, long ownerUserId, int messageId, bool readAllMentions = false)
: Command<DialogAggregate, DialogId, IExecutionResult>(aggregateId)
{
public long OwnerUserId { get; } = ownerUserId;

//public long ToPeerId { get; }
public int MessageId { get; } = messageId;

public bool ReadAllMentions { get; } = readAllMentions;

/*long toPeerId,*/
//ToPeerId = toPeerId;
}
10 changes: 10 additions & 0 deletions source/src/MyTelegram.Domain/Sagas/SendMessageSaga.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class SendMessageSaga : MyInMemoryAggregateSaga<SendMessageSaga, SendMess
ISagaHandles<MessageAggregate, MessageId, ReplyChannelMessageCompletedEvent>
{
private readonly IIdGenerator _idGenerator;
private bool _draftCleared;
private readonly SendMessageSagaState _state = new();
public SendMessageSaga(SendMessageSagaId id, IEventStore eventStore, IIdGenerator idGenerator) : base(id, eventStore)
{
Expand Down Expand Up @@ -121,6 +122,15 @@ public async Task HandleAsync(IDomainEvent<MessageAggregate, MessageId, OutboxMe
domainEvent.AggregateEvent.ReplyToMsgItems,
domainEvent.AggregateEvent.ChatMembers
));

if (!_draftCleared && domainEvent.AggregateEvent.ClearDraft && _state.ClearDraft)
{
Publish(new ClearDraftCommand(
DialogId.Create(domainEvent.AggregateEvent.OutboxMessageItem.SenderPeer.PeerId,
domainEvent.AggregateEvent.OutboxMessageItem.ToPeer),
domainEvent.AggregateEvent.RequestInfo with { RequestId = Guid.NewGuid() }));
_draftCleared = true;
}
await HandleSendOutboxMessageCompletedAsync(domainEvent.AggregateEvent.OutboxMessageItem);

await CreateInboxMessageAsync(domainEvent.AggregateEvent);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using MyTelegram.Domain.Aggregates.Device;
using MyTelegram.Domain.Commands.Device;

namespace MyTelegram.Messenger.CommandServer.EventHandlers;

public class LogOutEventHandler(
ICommandBus commandBus
) : IEventHandler<UserLoggedOutEvent>,
ITransientDependency
{
public async Task HandleEventAsync(UserLoggedOutEvent eventData)
{
var command = new UnRegisterDeviceForAuthKeyCommand(
DeviceId.Create(eventData.PermAuthKeyId),
eventData.PermAuthKeyId,
eventData.TempAuthKeyId);
await commandBus.PublishAsync(command, CancellationToken.None);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public static void AddEventHandlers(this IServiceCollection services)
services.AddSubscription<RpcMessageHasSentEvent, PtsEventHandler>();
services.AddSubscription<AcksDataReceivedEvent, PtsEventHandler>();

services.AddSubscription<UserLoggedOutEvent, LogOutEventHandler>();
}

public static void AddMyTelegramMessengerCommandServer(this IServiceCollection services,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,35 @@
/// 400 PEER_ID_INVALID The provided peer id is invalid.
/// See <a href="https://corefork.telegram.org/method/messages.readMentions" />
///</summary>
internal sealed class ReadMentionsHandler : RpcResultObjectHandler<MyTelegram.Schema.Messages.RequestReadMentions, MyTelegram.Schema.Messages.IAffectedHistory>
internal sealed class ReadMentionsHandler(
ICommandBus commandBus,
IPeerHelper peerHelper,
IAccessHashHelper accessHashHelper,
IQueryProcessor queryProcessor,
IPtsHelper ptsHelper)
: RpcResultObjectHandler<MyTelegram.Schema.Messages.RequestReadMentions, MyTelegram.Schema.Messages.IAffectedHistory>
{
protected override Task<MyTelegram.Schema.Messages.IAffectedHistory> HandleCoreAsync(IRequestInput input,
protected override async Task<MyTelegram.Schema.Messages.IAffectedHistory> HandleCoreAsync(IRequestInput input,
MyTelegram.Schema.Messages.RequestReadMentions obj)
{
throw new NotImplementedException();
await accessHashHelper.CheckAccessHashAsync(input, obj.Peer);
var peer = peerHelper.GetPeer(obj.Peer, input.UserId);
var dialogId = DialogId.Create(input.UserId, peer);

var dialogReadModel = await queryProcessor.ProcessAsync(new GetDialogByIdQuery(dialogId.Value));

if (dialogReadModel != null && dialogReadModel.UnreadMentionsCount > 0)
{
var messageId = obj.TopMsgId ?? dialogReadModel.TopMessage;
var command = new ReadMentionCommand(dialogId, input.UserId, messageId, true);
await commandBus.PublishAsync(command);
}

return new TAffectedHistory
{
Pts = ptsHelper.GetCachedPts(input.UserId),

@loyldg loyldg Oct 7, 2025

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If executing the command requires modifying pts, you may need to create a new XXXSaga to handle it. After incrementing pts, publish a DomainEvent, then return the result to the client in the DomainEventHandler. You can refer to EditMessageSaga/ClearHistorySaga as an example.

PtsCount = 0,
Offset = 0
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,35 @@ internal sealed class SendMessageHandler(
: RpcResultObjectHandler<RequestSendMessage, IUpdates>
{
protected override async Task<IUpdates> HandleCoreAsync(IRequestInput input,
RequestSendMessage obj)
RequestSendMessage obj)
{
await accessHashHelper.CheckAccessHashAsync(input, obj.Peer);
await accessHashHelper.CheckAccessHashAsync(input, obj.SendAs);
var media = await ProcessUrlsInMessageAsync(obj);

var media = await messageAppService.CreateInvitePreviewIfAnyAsync(
obj.Message,
options.Value.JoinChatDomain);

if (obj.Message.StartsWith("/"))
{
obj.Entities ??= [];
obj.Entities.Add(new TMessageEntityBotCommand
var match = Regex.Match(obj.Message, @"^/([A-Za-z0-9_]{1,64})\b");

if (match.Success)
{
Length = obj.Message.Length,
Offset = 0
});
obj.Entities ??= [];
obj.Entities.Add(new TMessageEntityBotCommand
{
Offset = 0,
Length = match.Length // includes leading slash
});
}
}

int? topMsgId = null;

var sendAs = peerHelper.GetPeer(obj.SendAs, input.UserId);
var sendMessageInput = new SendMessageInput(input.ToRequestInfo(),
var sendMessageInput = new SendMessageInput(
input.ToRequestInfo(),
input.UserId,
peerHelper.GetPeer(obj.Peer, input.UserId),
obj.Message,
Expand All @@ -112,49 +122,4 @@ protected override async Task<IUpdates> HandleCoreAsync(IRequestInput input,

return null!;
}
private async Task<TMessageMediaWebPage?> ProcessUrlsInMessageAsync(RequestSendMessage obj)
{
var pattern = @"(?:^|\s)(https?://[^\s]+)(?=\s|$)";
var pattern2 = @$"{options.Value.JoinChatDomain}/\+([\S]{{16}})";
var matches = Regex.Matches(obj.Message, pattern);
var isInviteUrlAdded = false;
TMessageMediaWebPage? media = null;
foreach (Match match in matches)
{
obj.Entities ??= [];
var url = match.Groups[1].Value;
var m2 = Regex.Match(url, pattern2);
if (m2.Success && !isInviteUrlAdded)
{
var link = m2.Groups[1].Value;
var chatInvite = await queryProcessor.ProcessAsync(new GetChatInviteByLinkQuery(link));
if (chatInvite != null)
{
var channelReadModel = await channelAppService.GetAsync(chatInvite.PeerId);
// Super group/Public channel
if (!channelReadModel.Broadcast ||
(channelReadModel.Broadcast && !string.IsNullOrEmpty(channelReadModel.UserName)))
{
media = new TMessageMediaWebPage
{
Webpage = new Schema.TWebPage
{
Id = Random.Shared.NextInt64(),
Url = $"{options.Value.JoinChatDomain}/+{link}",
DisplayUrl = $"{options.Value.JoinChatDomain}/+{link}",
Type = channelReadModel.Broadcast ? "telegram_channel" : "telegram_megagroup",
SiteName = "MyTelegram",
Title = channelReadModel.Title,
Description = $"Join this group on MyTelegram.",
}
};
}

isInviteUrlAdded = true;
}
}
}

return media;
}
}
Loading