diff options
author | bodqhrohro <bodqhrohro@gmail.com> | 2019-12-30 08:01:56 +0300 |
---|---|---|
committer | bodqhrohro <bodqhrohro@gmail.com> | 2019-12-30 08:01:56 +0300 |
commit | 7ea5e9ac73199f264961e5d9845f302b41108e13 (patch) | |
tree | 6182672b64295a1672f216da556889e70b515c6b /telegram | |
parent | 307d5136d4e4eeee20591578bed237f58addab95 (diff) |
Asynchronous message processing with guaranteed sequential per-chat delivery
Diffstat (limited to 'telegram')
-rw-r--r-- | telegram/client.go | 5 | ||||
-rw-r--r-- | telegram/handlers.go | 159 |
2 files changed, 93 insertions, 71 deletions
diff --git a/telegram/client.go b/telegram/client.go index dde15f2..7e7cdc8 100644 --- a/telegram/client.go +++ b/telegram/client.go @@ -55,6 +55,7 @@ type Client struct { type clientLocks struct { authorizationReady sync.WaitGroup + chatMessageLocks map[int64]*sync.Mutex } // NewClient instantiates a Telegram App @@ -107,6 +108,8 @@ func NewClient(conf config.TelegramConfig, jid string, component *xmpp.Component content: &conf.Content, cache: cache.NewCache(), options: options, - locks: clientLocks{}, + locks: clientLocks{ + chatMessageLocks: make(map[int64]*sync.Mutex), + }, }, nil } diff --git a/telegram/handlers.go b/telegram/handlers.go index c79f765..847bac8 100644 --- a/telegram/handlers.go +++ b/telegram/handlers.go @@ -35,6 +35,16 @@ func int64SliceToStringSlice(ints []int64) []string { return strings } +func (c *Client) getChatMessageLock(chatID int64) *sync.Mutex { + lock, ok := c.locks.chatMessageLocks[chatID] + if !ok { + lock = &sync.Mutex{} + c.locks.chatMessageLocks[chatID] = lock + } + + return lock +} + func (c *Client) updateHandler() { listener := c.client.GetListener() defer listener.Close() @@ -121,94 +131,103 @@ func (c *Client) updateUserStatus(update *client.UpdateUserStatus) { // new chat discovered func (c *Client) updateNewChat(update *client.UpdateNewChat) { - if update.Chat != nil && update.Chat.Photo != nil && update.Chat.Photo.Small != nil { - _, err := c.client.DownloadFile(&client.DownloadFileRequest{ - FileId: update.Chat.Photo.Small.Id, - Priority: 32, - Synchronous: true, - }) + go func() { + if update.Chat != nil && update.Chat.Photo != nil && update.Chat.Photo.Small != nil { + _, err := c.client.DownloadFile(&client.DownloadFileRequest{ + FileId: update.Chat.Photo.Small.Id, + Priority: 32, + Synchronous: true, + }) - if err != nil { - log.Error("Failed to download the chat photo") + if err != nil { + log.Error("Failed to download the chat photo") + } } - } - c.cache.SetChat(update.Chat.Id, update.Chat) + c.cache.SetChat(update.Chat.Id, update.Chat) - var isChannel = false - if update.Chat.Type.ChatTypeType() == client.TypeChatTypeSupergroup { - typeSupergroup, ok := update.Chat.Type.(*client.ChatTypeSupergroup) - if !ok { - uhOh() + var isChannel = false + if update.Chat.Type.ChatTypeType() == client.TypeChatTypeSupergroup { + typeSupergroup, ok := update.Chat.Type.(*client.ChatTypeSupergroup) + if !ok { + uhOh() + } + isChannel = typeSupergroup.IsChannel } - isChannel = typeSupergroup.IsChannel - } - if !(isChannel && update.Chat.LastReadInboxMessageId == 0) { - gateway.SendPresence( - c.xmpp, - c.jid, - gateway.SPFrom(strconv.FormatInt(update.Chat.Id, 10)), - gateway.SPType("subscribe"), - gateway.SPNickname(update.Chat.Title), - ) - } + if !(isChannel && update.Chat.LastReadInboxMessageId == 0) { + gateway.SendPresence( + c.xmpp, + c.jid, + gateway.SPFrom(strconv.FormatInt(update.Chat.Id, 10)), + gateway.SPType("subscribe"), + gateway.SPNickname(update.Chat.Title), + ) + } - if update.Chat.Id < 0 { - go c.processStatusUpdate(update.Chat.Id, update.Chat.Title, "chat") - } + if update.Chat.Id < 0 { + c.processStatusUpdate(update.Chat.Id, update.Chat.Title, "chat") + } + }() } // message received func (c *Client) updateNewMessage(update *client.UpdateNewMessage) { - // ignore self outgoing messages - if update.Message.IsOutgoing && - update.Message.SendingState != nil && - update.Message.SendingState.MessageSendingStateType() == client.TypeMessageSendingStatePending { - return - } + go func() { + // guarantee sequential message delivering per chat + lock := c.getChatMessageLock(update.Message.ChatId) + lock.Lock() + defer lock.Unlock() - log.WithFields(log.Fields{ - "chat_id": update.Message.ChatId, - }).Warn("New message from chat") + // ignore self outgoing messages + if update.Message.IsOutgoing && + update.Message.SendingState != nil && + update.Message.SendingState.MessageSendingStateType() == client.TypeMessageSendingStatePending { + return + } - text := c.messageToText(update.Message) - file, filename := c.contentToFilename(update.Message.Content) + log.WithFields(log.Fields{ + "chat_id": update.Message.ChatId, + }).Warn("New message from chat") - // download file(s) - if file != nil && !file.Local.IsDownloadingCompleted { - c.client.DownloadFile(&client.DownloadFileRequest{ - FileId: file.Id, - Priority: 32, - Synchronous: true, - }) - } - // OTR support (I do not know why would you need it, seriously) - if !strings.HasPrefix(text, "?OTR") { - var prefix strings.Builder - prefix.WriteString(c.messageToPrefix(update.Message, c.formatContent(file, filename))) - if text != "" { - // \n if it is groupchat and message is not empty - if update.Message.ChatId < 0 { - prefix.WriteString("\n") - } else if update.Message.ChatId > 0 { - prefix.WriteString(" | ") - } + text := c.messageToText(update.Message) + file, filename := c.contentToFilename(update.Message.Content) - prefix.WriteString(text) + // download file(s) + if file != nil && !file.Local.IsDownloadingCompleted { + c.client.DownloadFile(&client.DownloadFileRequest{ + FileId: file.Id, + Priority: 32, + Synchronous: true, + }) } + // OTR support (I do not know why would you need it, seriously) + if !strings.HasPrefix(text, "?OTR") { + var prefix strings.Builder + prefix.WriteString(c.messageToPrefix(update.Message, c.formatContent(file, filename))) + if text != "" { + // \n if it is groupchat and message is not empty + if update.Message.ChatId < 0 { + prefix.WriteString("\n") + } else if update.Message.ChatId > 0 { + prefix.WriteString(" | ") + } - text = prefix.String() - } + prefix.WriteString(text) + } - // mark message as read - c.client.ViewMessages(&client.ViewMessagesRequest{ - ChatId: update.Message.ChatId, - MessageIds: []int64{update.Message.Id}, - ForceRead: true, - }) - // forward message to XMPP - gateway.SendMessage(c.jid, strconv.FormatInt(update.Message.ChatId, 10), text, c.xmpp) + text = prefix.String() + } + + // mark message as read + c.client.ViewMessages(&client.ViewMessagesRequest{ + ChatId: update.Message.ChatId, + MessageIds: []int64{update.Message.Id}, + ForceRead: true, + }) + // forward message to XMPP + gateway.SendMessage(c.jid, strconv.FormatInt(update.Message.ChatId, 10), text, c.xmpp) + }() } // message content updated |