concord/internal/dm/service.go

850 lines
22 KiB
Go

package dm
import (
"context"
"strconv"
commonv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/common/v1"
streamv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/stream/v1"
"github.com/Alexander-D-Karpov/concord/internal/auth/interceptor"
"github.com/Alexander-D-Karpov/concord/internal/common/errors"
"github.com/Alexander-D-Karpov/concord/internal/events"
"github.com/Alexander-D-Karpov/concord/internal/users"
"github.com/Alexander-D-Karpov/concord/internal/voiceassign"
"github.com/google/uuid"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)
type Service struct {
repo *Repository
msgRepo *MessageRepository
usersRepo *users.Repository
hub *events.Hub
voiceAssign *voiceassign.Service
presence *users.PresenceManager
logger *zap.Logger
}
func NewService(
repo *Repository,
msgRepo *MessageRepository,
usersRepo *users.Repository,
hub *events.Hub,
voiceAssign *voiceassign.Service,
presence *users.PresenceManager,
logger *zap.Logger,
) *Service {
return &Service{
repo: repo,
msgRepo: msgRepo,
usersRepo: usersRepo,
hub: hub,
voiceAssign: voiceAssign,
presence: presence,
logger: logger,
}
}
func (s *Service) decorateChannelStatuses(channels []*DMChannelWithUser) {
for _, ch := range channels {
presence := users.StatusOffline
if s.presence != nil {
presence = s.presence.GetStatus(ch.OtherUserID)
}
ch.OtherUserStatus = users.EffectiveStatus(
users.NormalizeStatusPreference(ch.OtherUserStatus),
presence,
)
}
}
func (s *Service) GetOrCreateDM(ctx context.Context, otherUserID string) (*DMChannel, error) {
userID := interceptor.GetUserID(ctx)
if userID == "" {
return nil, errors.Unauthorized("user not authenticated")
}
user1UUID, err := uuid.Parse(userID)
if err != nil {
return nil, errors.BadRequest("invalid user id")
}
user2UUID, err := uuid.Parse(otherUserID)
if err != nil {
return nil, errors.BadRequest("invalid other user id")
}
if user1UUID == user2UUID {
return nil, errors.BadRequest("cannot create DM with yourself")
}
_, err = s.usersRepo.GetByID(ctx, user2UUID)
if err != nil {
return nil, errors.NotFound("user not found")
}
channel, err := s.repo.GetOrCreate(ctx, user1UUID, user2UUID)
if err != nil {
return nil, errors.Internal("failed to create DM channel", err)
}
return channel, nil
}
func (s *Service) ListDMs(ctx context.Context) ([]*DMChannelWithUser, error) {
userID := interceptor.GetUserID(ctx)
if userID == "" {
return nil, errors.Unauthorized("user not authenticated")
}
userUUID, err := uuid.Parse(userID)
if err != nil {
return nil, errors.BadRequest("invalid user id")
}
channels, err := s.repo.ListByUser(ctx, userUUID)
if err != nil {
return nil, err
}
s.decorateChannelStatuses(channels)
return channels, nil
}
func (s *Service) CloseDM(ctx context.Context, channelID string) error {
userID := interceptor.GetUserID(ctx)
if userID == "" {
return errors.Unauthorized("user not authenticated")
}
channelUUID, err := uuid.Parse(channelID)
if err != nil {
return errors.BadRequest("invalid channel id")
}
userUUID, err := uuid.Parse(userID)
if err != nil {
return errors.BadRequest("invalid user id")
}
isParticipant, err := s.repo.IsParticipant(ctx, channelUUID, userUUID)
if err != nil {
return errors.Internal("failed to check participation", err)
}
if !isParticipant {
return errors.Forbidden("not a participant of this DM")
}
return s.repo.Delete(ctx, channelUUID)
}
func (s *Service) GetChannel(ctx context.Context, channelID string) (*DMChannel, error) {
channelUUID, err := uuid.Parse(channelID)
if err != nil {
return nil, errors.BadRequest("invalid channel id")
}
return s.repo.GetByID(ctx, channelUUID)
}
func (s *Service) checkParticipant(ctx context.Context, channelID string) (uuid.UUID, uuid.UUID, error) {
userID := interceptor.GetUserID(ctx)
if userID == "" {
return uuid.Nil, uuid.Nil, errors.Unauthorized("user not authenticated")
}
channelUUID, err := uuid.Parse(channelID)
if err != nil {
return uuid.Nil, uuid.Nil, errors.BadRequest("invalid channel id")
}
userUUID, err := uuid.Parse(userID)
if err != nil {
return uuid.Nil, uuid.Nil, errors.BadRequest("invalid user id")
}
isParticipant, err := s.repo.IsParticipant(ctx, channelUUID, userUUID)
if err != nil {
return uuid.Nil, uuid.Nil, errors.Internal("failed to check participation", err)
}
if !isParticipant {
return uuid.Nil, uuid.Nil, errors.Forbidden("not a participant")
}
return channelUUID, userUUID, nil
}
func (s *Service) SendMessage(ctx context.Context, channelID, content string, replyToID string, attachments []DMAttachment, mentionUserIDs []string) (*DMMessage, error) {
channelUUID, userUUID, err := s.checkParticipant(ctx, channelID)
if err != nil {
return nil, err
}
msg := &DMMessage{
ChannelID: channelUUID,
AuthorID: userUUID,
Content: content,
Attachments: attachments,
}
if replyToID != "" {
id, err := strconv.ParseInt(replyToID, 10, 64)
if err != nil {
return nil, errors.BadRequest("invalid reply_to_id")
}
msg.ReplyToID = &id
}
if len(mentionUserIDs) > 0 {
for _, id := range mentionUserIDs {
if uid, err := uuid.Parse(id); err == nil {
msg.Mentions = append(msg.Mentions, uid)
}
}
}
if err := s.msgRepo.Create(ctx, msg); err != nil {
return nil, errors.Internal("failed to create message", err)
}
s.broadcastMessageCreated(ctx, channelUUID, msg)
return msg, nil
}
func (s *Service) EditMessage(ctx context.Context, channelID, messageID, content string) (*DMMessage, error) {
_, userUUID, err := s.checkParticipant(ctx, channelID)
if err != nil {
return nil, err
}
msgID, err := strconv.ParseInt(messageID, 10, 64)
if err != nil {
return nil, errors.BadRequest("invalid message id")
}
msg, err := s.msgRepo.GetByID(ctx, msgID)
if err != nil {
return nil, err
}
if msg.AuthorID != userUUID {
return nil, errors.Forbidden("can only edit own messages")
}
msg.Content = content
if err := s.msgRepo.Update(ctx, msg); err != nil {
return nil, err
}
s.broadcastMessageEdited(ctx, msg)
return msg, nil
}
func (s *Service) DeleteMessage(ctx context.Context, channelID, messageID string) error {
_, userUUID, err := s.checkParticipant(ctx, channelID)
if err != nil {
return err
}
msgID, err := strconv.ParseInt(messageID, 10, 64)
if err != nil {
return errors.BadRequest("invalid message id")
}
msg, err := s.msgRepo.GetByID(ctx, msgID)
if err != nil {
return err
}
if msg.AuthorID != userUUID {
return errors.Forbidden("can only delete own messages")
}
if err := s.msgRepo.SoftDelete(ctx, msgID); err != nil {
return err
}
s.broadcastMessageDeleted(ctx, msg.ChannelID, messageID)
return nil
}
func (s *Service) ListMessages(ctx context.Context, channelID string, beforeID, afterID *string, limit int) ([]*DMMessage, error) {
channelUUID, _, err := s.checkParticipant(ctx, channelID)
if err != nil {
return nil, err
}
var beforeIDInt, afterIDInt *int64
if beforeID != nil {
id, err := strconv.ParseInt(*beforeID, 10, 64)
if err != nil {
return nil, errors.BadRequest("invalid before_id")
}
beforeIDInt = &id
}
if afterID != nil {
id, err := strconv.ParseInt(*afterID, 10, 64)
if err != nil {
return nil, errors.BadRequest("invalid after_id")
}
afterIDInt = &id
}
return s.msgRepo.ListByChannel(ctx, channelUUID, beforeIDInt, afterIDInt, limit)
}
func (s *Service) AddReaction(ctx context.Context, channelID, messageID, emoji string) error {
_, userUUID, err := s.checkParticipant(ctx, channelID)
if err != nil {
return err
}
msgID, err := strconv.ParseInt(messageID, 10, 64)
if err != nil {
return errors.BadRequest("invalid message id")
}
msg, err := s.msgRepo.GetByID(ctx, msgID)
if err != nil {
return err
}
reaction, err := s.msgRepo.AddReaction(ctx, msgID, userUUID, emoji)
if err != nil {
return err
}
s.broadcastReactionAdded(ctx, msg.ChannelID, messageID, reaction)
return nil
}
func (s *Service) RemoveReaction(ctx context.Context, channelID, messageID, emoji string) error {
_, userUUID, err := s.checkParticipant(ctx, channelID)
if err != nil {
return err
}
msgID, err := strconv.ParseInt(messageID, 10, 64)
if err != nil {
return errors.BadRequest("invalid message id")
}
msg, err := s.msgRepo.GetByID(ctx, msgID)
if err != nil {
return err
}
reactionID, err := s.msgRepo.RemoveReaction(ctx, msgID, userUUID, emoji)
if err != nil {
return err
}
s.broadcastReactionRemoved(ctx, msg.ChannelID, messageID, reactionID.String(), userUUID.String())
return nil
}
func (s *Service) PinMessage(ctx context.Context, channelID, messageID string) error {
channelUUID, userUUID, err := s.checkParticipant(ctx, channelID)
if err != nil {
return err
}
msgID, err := strconv.ParseInt(messageID, 10, 64)
if err != nil {
return errors.BadRequest("invalid message id")
}
if err := s.msgRepo.PinMessage(ctx, channelUUID, msgID, userUUID); err != nil {
return err
}
s.broadcastMessagePinned(ctx, channelUUID, messageID, userUUID.String())
return nil
}
func (s *Service) UnpinMessage(ctx context.Context, channelID, messageID string) error {
channelUUID, _, err := s.checkParticipant(ctx, channelID)
if err != nil {
return err
}
msgID, err := strconv.ParseInt(messageID, 10, 64)
if err != nil {
return errors.BadRequest("invalid message id")
}
if err := s.msgRepo.UnpinMessage(ctx, channelUUID, msgID); err != nil {
return err
}
s.broadcastMessageUnpinned(ctx, channelUUID, messageID)
return nil
}
func (s *Service) ListPinnedMessages(ctx context.Context, channelID string) ([]*DMMessage, error) {
channelUUID, _, err := s.checkParticipant(ctx, channelID)
if err != nil {
return nil, err
}
return s.msgRepo.ListPinnedMessages(ctx, channelUUID)
}
func (s *Service) GetThread(ctx context.Context, channelID, messageID string, limit int, cursor string) ([]*DMMessage, string, error) {
_, _, err := s.checkParticipant(ctx, channelID)
if err != nil {
return nil, "", err
}
parentID, err := strconv.ParseInt(messageID, 10, 64)
if err != nil {
return nil, "", errors.BadRequest("invalid message id")
}
var offset int64
if cursor != "" {
offset, _ = strconv.ParseInt(cursor, 10, 64)
}
messages, err := s.msgRepo.GetThreadReplies(ctx, parentID, limit+1, int(offset))
if err != nil {
return nil, "", err
}
var nextCursor string
if len(messages) > limit {
messages = messages[:limit]
nextCursor = strconv.FormatInt(offset+int64(limit), 10)
}
return messages, nextCursor, nil
}
func (s *Service) SearchMessages(ctx context.Context, channelID, query string, limit int) ([]*DMMessage, error) {
channelUUID, _, err := s.checkParticipant(ctx, channelID)
if err != nil {
return nil, err
}
if limit <= 0 || limit > 100 {
limit = 50
}
return s.msgRepo.Search(ctx, channelUUID, query, limit)
}
func (s *Service) StartCall(ctx context.Context, channelID string, audioOnly bool) (*voiceassign.VoiceAssignmentResult, string, error) {
channelUUID, userUUID, err := s.checkParticipant(ctx, channelID)
if err != nil {
return nil, "", err
}
existingCall, err := s.repo.GetActiveCall(ctx, channelUUID)
if err != nil {
return nil, "", errors.Internal("failed to check active call", err)
}
if existingCall != nil {
return nil, "", errors.Conflict("call already active")
}
assignment, err := s.voiceAssign.AssignToVoice(ctx, channelID, userUUID.String(), audioOnly)
if err != nil {
return nil, "", errors.Internal("failed to assign voice server", err)
}
serverUUID, _ := uuid.Parse(assignment.ServerID)
call, err := s.repo.CreateCall(ctx, channelUUID, userUUID, &serverUUID)
if err != nil {
return nil, "", errors.Internal("failed to create call", err)
}
s.broadcastCallStarted(ctx, channelUUID, userUUID, audioOnly)
return assignment, call.ID.String(), nil
}
func (s *Service) JoinCall(ctx context.Context, channelID string, audioOnly bool) (*voiceassign.VoiceAssignmentResult, []voiceassign.VoiceParticipant, error) {
channelUUID, userUUID, err := s.checkParticipant(ctx, channelID)
if err != nil {
return nil, nil, err
}
call, err := s.repo.GetActiveCall(ctx, channelUUID)
if err != nil {
return nil, nil, errors.Internal("failed to check active call", err)
}
if call == nil {
s.logger.Info("JoinCall: no active call, starting new one", zap.String("channel_id", channelID))
assignment, _, err := s.StartCall(ctx, channelID, audioOnly)
if err != nil {
return nil, nil, err
}
return assignment, []voiceassign.VoiceParticipant{}, nil
}
assignment, err := s.voiceAssign.AssignToVoice(ctx, channelID, userUUID.String(), audioOnly)
if err != nil {
return nil, nil, errors.Internal("failed to assign voice server", err)
}
participants, _ := s.voiceAssign.GetVoiceParticipants(ctx, channelID)
var result []voiceassign.VoiceParticipant
for _, p := range participants {
if p.UserID != userUUID.String() {
result = append(result, *p)
}
}
channel, _ := s.repo.GetByID(ctx, channelUUID)
if channel != nil {
otherUserID := channel.User1ID
if otherUserID == userUUID {
otherUserID = channel.User2ID
}
s.hub.BroadcastToUser(otherUserID.String(), &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_VoiceUserJoined{
VoiceUserJoined: &streamv1.VoiceUserJoined{
RoomId: channelID,
UserId: userUUID.String(),
AudioOnly: audioOnly,
},
},
})
}
return assignment, result, nil
}
func (s *Service) broadcastCallStarted(ctx context.Context, channelID, starterID uuid.UUID, audioOnly bool) {
channel, _ := s.repo.GetByID(ctx, channelID)
if channel != nil {
otherUserID := channel.User1ID
if otherUserID == starterID {
otherUserID = channel.User2ID
}
// Notify the other user that a call started
s.hub.BroadcastToUser(otherUserID.String(), &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_DmCallStarted{
DmCallStarted: &streamv1.DMCallStarted{
ChannelId: channelID.String(),
CallerId: starterID.String(),
AudioOnly: audioOnly,
},
},
})
}
}
func (s *Service) LeaveCall(ctx context.Context, channelID string) error {
channelUUID, userUUID, err := s.checkParticipant(ctx, channelID)
if err != nil {
return err
}
if err := s.voiceAssign.LeaveVoice(ctx, channelID, userUUID.String()); err != nil {
s.logger.Warn("failed to leave voice", zap.Error(err))
}
channel, _ := s.repo.GetByID(ctx, channelUUID)
if channel != nil {
otherUserID := channel.User1ID
if otherUserID == userUUID {
otherUserID = channel.User2ID
}
s.hub.BroadcastToUser(otherUserID.String(), &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_VoiceUserLeft{
VoiceUserLeft: &streamv1.VoiceUserLeft{
RoomId: channelID,
UserId: userUUID.String(),
},
},
})
}
participants, _ := s.voiceAssign.GetVoiceParticipants(ctx, channelID)
if len(participants) == 0 {
_ = s.repo.EndActiveCall(ctx, channelUUID)
if channel != nil {
s.hub.BroadcastToUser(channel.User1ID.String(), &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_DmCallEnded{
DmCallEnded: &streamv1.DMCallEnded{
ChannelId: channelID,
UserId: userUUID.String(),
},
},
})
s.hub.BroadcastToUser(channel.User2ID.String(), &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_DmCallEnded{
DmCallEnded: &streamv1.DMCallEnded{
ChannelId: channelID,
UserId: userUUID.String(),
},
},
})
}
}
return nil
}
func (s *Service) EndCall(ctx context.Context, channelID string) error {
channelUUID, userUUID, err := s.checkParticipant(ctx, channelID)
if err != nil {
return err
}
call, err := s.repo.GetActiveCall(ctx, channelUUID)
if err != nil {
return errors.Internal("failed to check active call", err)
}
if call == nil {
return errors.NotFound("no active call")
}
if err := s.repo.EndCall(ctx, call.ID); err != nil {
return errors.Internal("failed to end call", err)
}
channel, _ := s.repo.GetByID(ctx, channelUUID)
if channel != nil {
s.hub.BroadcastToUser(channel.User1ID.String(), &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_VoiceUserLeft{
VoiceUserLeft: &streamv1.VoiceUserLeft{
RoomId: channelID,
UserId: userUUID.String(),
},
},
})
s.hub.BroadcastToUser(channel.User2ID.String(), &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_VoiceUserLeft{
VoiceUserLeft: &streamv1.VoiceUserLeft{
RoomId: channelID,
UserId: userUUID.String(),
},
},
})
}
return nil
}
func (s *Service) GetCallStatus(ctx context.Context, channelID string) (*DMCall, []*voiceassign.VoiceParticipant, error) {
channelUUID, _, err := s.checkParticipant(ctx, channelID)
if err != nil {
return nil, nil, err
}
call, err := s.repo.GetActiveCall(ctx, channelUUID)
if err != nil {
return nil, nil, errors.Internal("failed to get call status", err)
}
if call == nil {
return nil, nil, nil
}
participants, _ := s.voiceAssign.GetVoiceParticipants(ctx, channelID)
return call, participants, nil
}
func (s *Service) broadcastMessageCreated(ctx context.Context, channelID uuid.UUID, msg *DMMessage) {
channel, _ := s.repo.GetByID(ctx, channelID)
if channel == nil || s.hub == nil {
return
}
event := &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_DmMessageCreated{
DmMessageCreated: &streamv1.DMMessageCreated{
ChannelId: channelID.String(),
Message: dmMessageToProto(msg),
},
},
}
s.hub.BroadcastToUser(channel.User1ID.String(), event)
s.hub.BroadcastToUser(channel.User2ID.String(), event)
}
func (s *Service) broadcastMessageEdited(ctx context.Context, msg *DMMessage) {
channel, _ := s.repo.GetByID(ctx, msg.ChannelID)
if channel == nil || s.hub == nil {
return
}
event := &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_MessageEdited{
MessageEdited: &streamv1.MessageEdited{
Message: dmMessageToCommonProto(msg),
},
},
}
s.hub.BroadcastToUser(channel.User1ID.String(), event)
s.hub.BroadcastToUser(channel.User2ID.String(), event)
}
func (s *Service) broadcastMessageDeleted(ctx context.Context, channelID uuid.UUID, messageID string) {
channel, _ := s.repo.GetByID(ctx, channelID)
if channel == nil || s.hub == nil {
return
}
event := &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_MessageDeleted{
MessageDeleted: &streamv1.MessageDeleted{
MessageId: messageID,
RoomId: channelID.String(),
},
},
}
s.hub.BroadcastToUser(channel.User1ID.String(), event)
s.hub.BroadcastToUser(channel.User2ID.String(), event)
}
func (s *Service) broadcastReactionAdded(ctx context.Context, channelID uuid.UUID, messageID string, reaction *DMReaction) {
channel, _ := s.repo.GetByID(ctx, channelID)
if channel == nil || s.hub == nil {
return
}
event := &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_MessageReactionAdded{
MessageReactionAdded: &streamv1.MessageReactionAdded{
MessageId: messageID,
RoomId: channelID.String(),
Reaction: dmReactionToProto(reaction),
},
},
}
s.hub.BroadcastToUser(channel.User1ID.String(), event)
s.hub.BroadcastToUser(channel.User2ID.String(), event)
}
func (s *Service) broadcastReactionRemoved(ctx context.Context, channelID uuid.UUID, messageID, reactionID, userID string) {
channel, _ := s.repo.GetByID(ctx, channelID)
if channel == nil || s.hub == nil {
return
}
event := &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_MessageReactionRemoved{
MessageReactionRemoved: &streamv1.MessageReactionRemoved{
MessageId: messageID,
RoomId: channelID.String(),
ReactionId: reactionID,
UserId: userID,
},
},
}
s.hub.BroadcastToUser(channel.User1ID.String(), event)
s.hub.BroadcastToUser(channel.User2ID.String(), event)
}
func (s *Service) broadcastMessagePinned(ctx context.Context, channelID uuid.UUID, messageID, pinnedBy string) {
channel, _ := s.repo.GetByID(ctx, channelID)
if channel == nil || s.hub == nil {
return
}
event := &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_MessagePinned{
MessagePinned: &streamv1.MessagePinned{
MessageId: messageID,
RoomId: channelID.String(),
PinnedBy: pinnedBy,
},
},
}
s.hub.BroadcastToUser(channel.User1ID.String(), event)
s.hub.BroadcastToUser(channel.User2ID.String(), event)
}
func (s *Service) broadcastMessageUnpinned(ctx context.Context, channelID uuid.UUID, messageID string) {
channel, _ := s.repo.GetByID(ctx, channelID)
if channel == nil || s.hub == nil {
return
}
event := &streamv1.ServerEvent{
EventId: uuid.New().String(),
CreatedAt: timestamppb.Now(),
Payload: &streamv1.ServerEvent_MessageUnpinned{
MessageUnpinned: &streamv1.MessageUnpinned{
MessageId: messageID,
RoomId: channelID.String(),
},
},
}
s.hub.BroadcastToUser(channel.User1ID.String(), event)
s.hub.BroadcastToUser(channel.User2ID.String(), event)
}
func (s *Service) GetOtherParticipant(ctx context.Context, channelID, userID uuid.UUID) (uuid.UUID, error) {
channel, err := s.repo.GetByID(ctx, channelID)
if err != nil {
return uuid.Nil, err
}
if channel.User1ID == userID {
return channel.User2ID, nil
}
return channel.User1ID, nil
}
func dmReactionToProto(reaction *DMReaction) *commonv1.MessageReaction {
return &commonv1.MessageReaction{
Id: reaction.ID.String(),
MessageId: strconv.FormatInt(reaction.MessageID, 10),
UserId: reaction.UserID.String(),
Emoji: reaction.Emoji,
CreatedAt: timestamppb.New(reaction.CreatedAt),
}
}