aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--telegram/client.go6
-rw-r--r--telegram/commands.go7
-rw-r--r--telegram/handlers.go28
-rw-r--r--telegram/utils.go74
-rw-r--r--xmpp/gateway/gateway.go14
-rw-r--r--xmpp/handlers.go3
6 files changed, 89 insertions, 43 deletions
diff --git a/telegram/client.go b/telegram/client.go
index 71d8125..61d46aa 100644
--- a/telegram/client.go
+++ b/telegram/client.go
@@ -52,6 +52,7 @@ type Client struct {
jid string
Session *persistence.Session
resources map[string]bool
+ outbox map[string]string
content *config.TelegramContentConfig
cache *cache.Cache
online bool
@@ -59,13 +60,15 @@ type Client struct {
DelayedStatuses map[int64]*DelayedStatus
DelayedStatusesLock sync.Mutex
- locks clientLocks
+ locks clientLocks
+ SendMessageLock sync.Mutex
}
type clientLocks struct {
authorizationReady sync.Mutex
chatMessageLocks map[int64]*sync.Mutex
resourcesLock sync.Mutex
+ outboxLock sync.Mutex
}
// NewClient instantiates a Telegram App
@@ -121,6 +124,7 @@ func NewClient(conf config.TelegramConfig, jid string, component *xmpp.Component
jid: jid,
Session: session,
resources: make(map[string]bool),
+ outbox: make(map[string]string),
content: &conf.Content,
cache: cache.NewCache(),
options: options,
diff --git a/telegram/commands.go b/telegram/commands.go
index 2f879b1..53b5d75 100644
--- a/telegram/commands.go
+++ b/telegram/commands.go
@@ -513,11 +513,14 @@ func (c *Client) ProcessChatCommand(chatID int64, cmdline string) (string, bool)
content := c.PrepareOutgoingMessageContent(rawCmdArguments(cmdline, 0))
if content != nil {
- c.client.EditMessageText(&client.EditMessageTextRequest{
+ _, err = c.client.EditMessageText(&client.EditMessageTextRequest{
ChatId: chatID,
MessageId: message.Id,
InputMessageContent: content,
})
+ if err != nil {
+ return "Message editing error", true
+ }
} else {
return "Message processing error", true
}
@@ -650,7 +653,7 @@ func (c *Client) ProcessChatCommand(chatID int64, cmdline string) (string, bool)
}
if messages != nil && messages.Messages != nil {
for _, message := range messages.Messages {
- c.ProcessIncomingMessage(targetChatId, message)
+ c.ProcessIncomingMessage(targetChatId, message, "")
}
}
// print vCard
diff --git a/telegram/handlers.go b/telegram/handlers.go
index 65e5f2f..8173ecd 100644
--- a/telegram/handlers.go
+++ b/telegram/handlers.go
@@ -203,26 +203,30 @@ func (c *Client) updateChatLastMessage(update *client.UpdateChatLastMessage) {
// message received
func (c *Client) updateNewMessage(update *client.UpdateNewMessage) {
- go func() {
- chatId := update.Message.ChatId
+ chatId := update.Message.ChatId
+
+ c.SendMessageLock.Lock()
+ c.SendMessageLock.Unlock()
+ xmppId, err := gateway.IdsDB.GetByTgIds(c.Session.Login, c.jid, chatId, update.Message.Id)
+ var ignoredResource string
+ if err == nil {
+ ignoredResource = c.popFromOutbox(xmppId)
+ } else {
+ log.Infof("Couldn't retrieve XMPP message ids for %v, an echo may happen", update.Message.Id)
+ }
+ log.Warnf("xmppId: %v, ignoredResource: %v", xmppId, ignoredResource)
- // guarantee sequential message delivering per chat
- lock := c.getChatMessageLock(chatId)
+ // guarantee sequential message delivering per chat
+ lock := c.getChatMessageLock(chatId)
+ go func() {
lock.Lock()
defer lock.Unlock()
- // ignore self outgoing messages
- if update.Message.IsOutgoing &&
- update.Message.SendingState != nil &&
- update.Message.SendingState.MessageSendingStateType() == client.TypeMessageSendingStatePending {
- return
- }
-
log.WithFields(log.Fields{
"chat_id": chatId,
}).Warn("New message from chat")
- c.ProcessIncomingMessage(chatId, update.Message)
+ c.ProcessIncomingMessage(chatId, update.Message, ignoredResource)
}()
}
diff --git a/telegram/utils.go b/telegram/utils.go
index a9e0bc8..9664857 100644
--- a/telegram/utils.go
+++ b/telegram/utils.go
@@ -890,9 +890,34 @@ func (c *Client) ensureDownloadFile(file *client.File) *client.File {
}
// ProcessIncomingMessage transfers a message to XMPP side and marks it as read on Telegram side
-func (c *Client) ProcessIncomingMessage(chatId int64, message *client.Message) {
- var text, oob, auxText string
+func (c *Client) ProcessIncomingMessage(chatId int64, message *client.Message, ignoredResource string) {
+ var jids []string
+ var isPM bool
var err error
+ if gateway.MessageOutgoingPermission && c.Session.Carbons {
+ isPM, err = c.IsPM(chatId)
+ if err != nil {
+ log.Errorf("Could not determine if chat is PM: %v", err)
+ }
+ }
+ isOutgoing := message.IsOutgoing
+ isCarbon := isPM && isOutgoing
+ log.Warnf("isOutgoing: %v", isOutgoing)
+ if isOutgoing {
+ for resource := range c.resourcesRange() {
+ if ignoredResource == "" || resource != ignoredResource {
+ jids = append(jids, c.jid+"/"+resource)
+ }
+ }
+ if len(jids) == 0 {
+ log.Info("The only resource is ignored, aborting")
+ return
+ }
+ } else {
+ jids = []string{c.jid}
+ }
+
+ var text, oob, auxText string
reply, replyMsg := c.getMessageReply(message)
@@ -965,27 +990,10 @@ func (c *Client) ProcessIncomingMessage(chatId int64, message *client.Message) {
sId := strconv.FormatInt(message.Id, 10)
sChatId := strconv.FormatInt(chatId, 10)
- var jids []string
- var isPM bool
- if gateway.MessageOutgoingPermission && c.Session.Carbons {
- isPM, err = c.IsPM(chatId)
- if err != nil {
- log.Errorf("Could not determine if chat is PM: %v", err)
- }
- }
- isOutgoing := isPM && message.IsOutgoing
- if isOutgoing {
- for resource := range c.resourcesRange() {
- jids = append(jids, c.jid+"/"+resource)
- }
- } else {
- jids = []string{c.jid}
- }
-
for _, jid := range jids {
- gateway.SendMessageWithOOB(jid, sChatId, text, sId, c.xmpp, reply, oob, isOutgoing)
+ gateway.SendMessageWithOOB(jid, sChatId, text, sId, c.xmpp, reply, oob, isCarbon)
if auxText != "" {
- gateway.SendMessage(jid, sChatId, auxText, sId, c.xmpp, reply, isOutgoing)
+ gateway.SendMessage(jid, sChatId, auxText, sId, c.xmpp, reply, isCarbon)
}
}
}
@@ -1172,9 +1180,12 @@ func (c *Client) resourcesRange() chan string {
// resend statuses to (to another resource, for example)
func (c *Client) roster(resource string) {
+ c.locks.resourcesLock.Lock()
if _, ok := c.resources[resource]; ok {
+ c.locks.resourcesLock.Unlock()
return // we know it
}
+ c.locks.resourcesLock.Unlock()
log.Warnf("Sending roster for %v", resource)
@@ -1347,3 +1358,24 @@ func (c *Client) UpdateChatNicknames() {
}
}
}
+
+// AddToOutbox remembers the resource from which a message with given ID was sent
+func (c *Client) AddToOutbox(xmppId, resource string) {
+ c.locks.outboxLock.Lock()
+ defer c.locks.outboxLock.Unlock()
+
+ c.outbox[xmppId] = resource
+}
+
+func (c *Client) popFromOutbox(xmppId string) string {
+ c.locks.outboxLock.Lock()
+ defer c.locks.outboxLock.Unlock()
+
+ resource, ok := c.outbox[xmppId]
+ if ok {
+ delete(c.outbox, xmppId)
+ } else {
+ log.Warnf("No %v xmppId in outbox", xmppId)
+ }
+ return resource
+}
diff --git a/xmpp/gateway/gateway.go b/xmpp/gateway/gateway.go
index 29c8a07..7e54ee5 100644
--- a/xmpp/gateway/gateway.go
+++ b/xmpp/gateway/gateway.go
@@ -42,8 +42,8 @@ var DirtySessions = false
var MessageOutgoingPermission = false
// SendMessage creates and sends a message stanza
-func SendMessage(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, isOutgoing bool) {
- sendMessageWrapper(to, from, body, id, component, reply, "", isOutgoing)
+func SendMessage(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, isCarbon bool) {
+ sendMessageWrapper(to, from, body, id, component, reply, "", isCarbon)
}
// SendServiceMessage creates and sends a simple message stanza from transport
@@ -57,11 +57,11 @@ func SendTextMessage(to string, from string, body string, component *xmpp.Compon
}
// SendMessageWithOOB creates and sends a message stanza with OOB URL
-func SendMessageWithOOB(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isOutgoing bool) {
- sendMessageWrapper(to, from, body, id, component, reply, oob, isOutgoing)
+func SendMessageWithOOB(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isCarbon bool) {
+ sendMessageWrapper(to, from, body, id, component, reply, oob, isCarbon)
}
-func sendMessageWrapper(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isOutgoing bool) {
+func sendMessageWrapper(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isCarbon bool) {
toJid, err := stanza.NewJid(to)
if err != nil {
log.WithFields(log.Fields{
@@ -83,7 +83,7 @@ func sendMessageWrapper(to string, from string, body string, id string, componen
logFrom = from
messageFrom = from + "@" + componentJid
}
- if isOutgoing {
+ if isCarbon {
messageTo = messageFrom
messageFrom = bareTo + "/" + Jid.Resource
} else {
@@ -120,7 +120,7 @@ func sendMessageWrapper(to string, from string, body string, id string, componen
}
}
- if isOutgoing {
+ if isCarbon {
carbonMessage := extensions.ClientMessage{
Attrs: stanza.Attrs{
From: bareTo,
diff --git a/xmpp/handlers.go b/xmpp/handlers.go
index 1286914..e6671bc 100644
--- a/xmpp/handlers.go
+++ b/xmpp/handlers.go
@@ -167,6 +167,8 @@ func HandleMessage(s xmpp.Sender, p stanza.Packet) {
}
}
+ session.SendMessageLock.Lock()
+ defer session.SendMessageLock.Unlock()
tgMessageId := session.ProcessOutgoingMessage(toID, text, msg.From, replyId, replaceId)
if tgMessageId != 0 {
if replaceId != 0 {
@@ -181,6 +183,7 @@ func HandleMessage(s xmpp.Sender, p stanza.Packet) {
log.Errorf("Failed to save ids %v/%v %v", toID, tgMessageId, msg.Id)
}
}
+ session.AddToOutbox(msg.Id, resource)
} else {
/*
// if a message failed to edit on Telegram side, match new XMPP ID with old Telegram ID anyway