aboutsummaryrefslogtreecommitdiff
path: root/telegram
diff options
context:
space:
mode:
authorbodqhrohro <bodqhrohro@gmail.com>2019-12-30 08:01:56 +0300
committerbodqhrohro <bodqhrohro@gmail.com>2019-12-30 08:01:56 +0300
commit7ea5e9ac73199f264961e5d9845f302b41108e13 (patch)
tree6182672b64295a1672f216da556889e70b515c6b /telegram
parent307d5136d4e4eeee20591578bed237f58addab95 (diff)
Asynchronous message processing with guaranteed sequential per-chat delivery
Diffstat (limited to 'telegram')
-rw-r--r--telegram/client.go5
-rw-r--r--telegram/handlers.go159
2 files changed, 93 insertions, 71 deletions
diff --git a/telegram/client.go b/telegram/client.go
index dde15f2..7e7cdc8 100644
--- a/telegram/client.go
+++ b/telegram/client.go
@@ -55,6 +55,7 @@ type Client struct {
type clientLocks struct {
authorizationReady sync.WaitGroup
+ chatMessageLocks map[int64]*sync.Mutex
}
// NewClient instantiates a Telegram App
@@ -107,6 +108,8 @@ func NewClient(conf config.TelegramConfig, jid string, component *xmpp.Component
content: &conf.Content,
cache: cache.NewCache(),
options: options,
- locks: clientLocks{},
+ locks: clientLocks{
+ chatMessageLocks: make(map[int64]*sync.Mutex),
+ },
}, nil
}
diff --git a/telegram/handlers.go b/telegram/handlers.go
index c79f765..847bac8 100644
--- a/telegram/handlers.go
+++ b/telegram/handlers.go
@@ -35,6 +35,16 @@ func int64SliceToStringSlice(ints []int64) []string {
return strings
}
+func (c *Client) getChatMessageLock(chatID int64) *sync.Mutex {
+ lock, ok := c.locks.chatMessageLocks[chatID]
+ if !ok {
+ lock = &sync.Mutex{}
+ c.locks.chatMessageLocks[chatID] = lock
+ }
+
+ return lock
+}
+
func (c *Client) updateHandler() {
listener := c.client.GetListener()
defer listener.Close()
@@ -121,94 +131,103 @@ func (c *Client) updateUserStatus(update *client.UpdateUserStatus) {
// new chat discovered
func (c *Client) updateNewChat(update *client.UpdateNewChat) {
- if update.Chat != nil && update.Chat.Photo != nil && update.Chat.Photo.Small != nil {
- _, err := c.client.DownloadFile(&client.DownloadFileRequest{
- FileId: update.Chat.Photo.Small.Id,
- Priority: 32,
- Synchronous: true,
- })
+ go func() {
+ if update.Chat != nil && update.Chat.Photo != nil && update.Chat.Photo.Small != nil {
+ _, err := c.client.DownloadFile(&client.DownloadFileRequest{
+ FileId: update.Chat.Photo.Small.Id,
+ Priority: 32,
+ Synchronous: true,
+ })
- if err != nil {
- log.Error("Failed to download the chat photo")
+ if err != nil {
+ log.Error("Failed to download the chat photo")
+ }
}
- }
- c.cache.SetChat(update.Chat.Id, update.Chat)
+ c.cache.SetChat(update.Chat.Id, update.Chat)
- var isChannel = false
- if update.Chat.Type.ChatTypeType() == client.TypeChatTypeSupergroup {
- typeSupergroup, ok := update.Chat.Type.(*client.ChatTypeSupergroup)
- if !ok {
- uhOh()
+ var isChannel = false
+ if update.Chat.Type.ChatTypeType() == client.TypeChatTypeSupergroup {
+ typeSupergroup, ok := update.Chat.Type.(*client.ChatTypeSupergroup)
+ if !ok {
+ uhOh()
+ }
+ isChannel = typeSupergroup.IsChannel
}
- isChannel = typeSupergroup.IsChannel
- }
- if !(isChannel && update.Chat.LastReadInboxMessageId == 0) {
- gateway.SendPresence(
- c.xmpp,
- c.jid,
- gateway.SPFrom(strconv.FormatInt(update.Chat.Id, 10)),
- gateway.SPType("subscribe"),
- gateway.SPNickname(update.Chat.Title),
- )
- }
+ if !(isChannel && update.Chat.LastReadInboxMessageId == 0) {
+ gateway.SendPresence(
+ c.xmpp,
+ c.jid,
+ gateway.SPFrom(strconv.FormatInt(update.Chat.Id, 10)),
+ gateway.SPType("subscribe"),
+ gateway.SPNickname(update.Chat.Title),
+ )
+ }
- if update.Chat.Id < 0 {
- go c.processStatusUpdate(update.Chat.Id, update.Chat.Title, "chat")
- }
+ if update.Chat.Id < 0 {
+ c.processStatusUpdate(update.Chat.Id, update.Chat.Title, "chat")
+ }
+ }()
}
// message received
func (c *Client) updateNewMessage(update *client.UpdateNewMessage) {
- // ignore self outgoing messages
- if update.Message.IsOutgoing &&
- update.Message.SendingState != nil &&
- update.Message.SendingState.MessageSendingStateType() == client.TypeMessageSendingStatePending {
- return
- }
+ go func() {
+ // guarantee sequential message delivering per chat
+ lock := c.getChatMessageLock(update.Message.ChatId)
+ lock.Lock()
+ defer lock.Unlock()
- log.WithFields(log.Fields{
- "chat_id": update.Message.ChatId,
- }).Warn("New message from chat")
+ // ignore self outgoing messages
+ if update.Message.IsOutgoing &&
+ update.Message.SendingState != nil &&
+ update.Message.SendingState.MessageSendingStateType() == client.TypeMessageSendingStatePending {
+ return
+ }
- text := c.messageToText(update.Message)
- file, filename := c.contentToFilename(update.Message.Content)
+ log.WithFields(log.Fields{
+ "chat_id": update.Message.ChatId,
+ }).Warn("New message from chat")
- // download file(s)
- if file != nil && !file.Local.IsDownloadingCompleted {
- c.client.DownloadFile(&client.DownloadFileRequest{
- FileId: file.Id,
- Priority: 32,
- Synchronous: true,
- })
- }
- // OTR support (I do not know why would you need it, seriously)
- if !strings.HasPrefix(text, "?OTR") {
- var prefix strings.Builder
- prefix.WriteString(c.messageToPrefix(update.Message, c.formatContent(file, filename)))
- if text != "" {
- // \n if it is groupchat and message is not empty
- if update.Message.ChatId < 0 {
- prefix.WriteString("\n")
- } else if update.Message.ChatId > 0 {
- prefix.WriteString(" | ")
- }
+ text := c.messageToText(update.Message)
+ file, filename := c.contentToFilename(update.Message.Content)
- prefix.WriteString(text)
+ // download file(s)
+ if file != nil && !file.Local.IsDownloadingCompleted {
+ c.client.DownloadFile(&client.DownloadFileRequest{
+ FileId: file.Id,
+ Priority: 32,
+ Synchronous: true,
+ })
}
+ // OTR support (I do not know why would you need it, seriously)
+ if !strings.HasPrefix(text, "?OTR") {
+ var prefix strings.Builder
+ prefix.WriteString(c.messageToPrefix(update.Message, c.formatContent(file, filename)))
+ if text != "" {
+ // \n if it is groupchat and message is not empty
+ if update.Message.ChatId < 0 {
+ prefix.WriteString("\n")
+ } else if update.Message.ChatId > 0 {
+ prefix.WriteString(" | ")
+ }
- text = prefix.String()
- }
+ prefix.WriteString(text)
+ }
- // mark message as read
- c.client.ViewMessages(&client.ViewMessagesRequest{
- ChatId: update.Message.ChatId,
- MessageIds: []int64{update.Message.Id},
- ForceRead: true,
- })
- // forward message to XMPP
- gateway.SendMessage(c.jid, strconv.FormatInt(update.Message.ChatId, 10), text, c.xmpp)
+ text = prefix.String()
+ }
+
+ // mark message as read
+ c.client.ViewMessages(&client.ViewMessagesRequest{
+ ChatId: update.Message.ChatId,
+ MessageIds: []int64{update.Message.Id},
+ ForceRead: true,
+ })
+ // forward message to XMPP
+ gateway.SendMessage(c.jid, strconv.FormatInt(update.Message.ChatId, 10), text, c.xmpp)
+ }()
}
// message content updated