From 9a84e9a8b6b7a6f953301e54f19cdf4be73592e1 Mon Sep 17 00:00:00 2001 From: Bohdan Horbeshko Date: Sat, 3 Jun 2023 00:20:03 -0400 Subject: Store message ids in Badger DB --- badger/ids.go | 132 +++++++++++++++++++++++++++++++++++++++++++++++++++++ badger/ids_test.go | 72 +++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 badger/ids.go create mode 100644 badger/ids_test.go (limited to 'badger') diff --git a/badger/ids.go b/badger/ids.go new file mode 100644 index 0000000..80fb9ad --- /dev/null +++ b/badger/ids.go @@ -0,0 +1,132 @@ +package badger + +import ( + "bytes" + "errors" + "fmt" + "strconv" + + badger "github.com/dgraph-io/badger/v4" + log "github.com/sirupsen/logrus" +) + +// IdsDB represents a Badger database +type IdsDB struct { + db *badger.DB +} + +// IdsDBOpen returns a new DB object +func IdsDBOpen(path string) IdsDB { + bdb, err := badger.Open(badger.DefaultOptions(path)) + if err != nil { + log.Errorf("Failed to open ids database: %v, falling back to in-memory database", path) + bdb, err = badger.Open(badger.DefaultOptions("").WithInMemory(true)) + if err != nil { + log.Fatalf("Couldn't initialize the ids database") + } + } + + return IdsDB{ + db: bdb, + } +} + +// Set stores an id pair +func (db *IdsDB) Set(tgAccount, xmppAccount string, tgChatId, tgMsgId int64, xmppId string) error { + bPrefix := toKeyPrefix(tgAccount, xmppAccount) + bTgId := toTgByteString(tgChatId, tgMsgId) + bXmppId := toXmppByteString(xmppId) + bTgKey := toByteKey(bPrefix, bTgId, "tg") + bXmppKey := toByteKey(bPrefix, bXmppId, "xmpp") + + return db.db.Update(func(txn *badger.Txn) error { + if err := txn.Set(bTgKey, bXmppId); err != nil { + return err + } + return txn.Set(bXmppKey, bTgId) + }) +} + +func (db *IdsDB) getByteValue(key []byte) ([]byte, error) { + var valCopy []byte + err := db.db.View(func(txn *badger.Txn) error { + item, err := txn.Get(key) + if err != nil { + return err + } + + valCopy, err = item.ValueCopy(nil) + return err + }) + return valCopy, err +} + +// GetByTgIds obtains an XMPP id by Telegram chat/message ids +func (db *IdsDB) GetByTgIds(tgAccount, xmppAccount string, tgChatId, tgMsgId int64) (string, error) { + val, err := db.getByteValue(toByteKey( + toKeyPrefix(tgAccount, xmppAccount), + toTgByteString(tgChatId, tgMsgId), + "tg", + )) + if err != nil { + return "", err + } + return string(val), nil +} + +// GetByXmppId obtains Telegram chat/message ids by an XMPP id +func (db *IdsDB) GetByXmppId(tgAccount, xmppAccount, xmppId string) (int64, int64, error) { + val, err := db.getByteValue(toByteKey( + toKeyPrefix(tgAccount, xmppAccount), + toXmppByteString(xmppId), + "xmpp", + )) + if err != nil { + return 0, 0, err + } + return splitTgByteString(val) +} + +func toKeyPrefix(tgAccount, xmppAccount string) []byte { + return []byte(fmt.Sprintf("%v/%v/", tgAccount, xmppAccount)) +} + +func toByteKey(prefix, suffix []byte, typ string) []byte { + key := make([]byte, 0, len(prefix) + len(suffix) + 6) + key = append(key, prefix...) + key = append(key, []byte(typ)...) + key = append(key, []byte("/")...) + key = append(key, suffix...) + return key +} + +func toTgByteString(tgChatId, tgMsgId int64) []byte { + return []byte(fmt.Sprintf("%v/%v", tgChatId, tgMsgId)) +} + +func toXmppByteString(xmppId string) []byte { + return []byte(xmppId) +} + +func splitTgByteString(val []byte) (int64, int64, error) { + parts := bytes.Split(val, []byte("/")) + if len(parts) != 2 { + return 0, 0, errors.New("Couldn't parse tg id pair") + } + tgChatId, err := strconv.ParseInt(string(parts[0]), 10, 64) + if err != nil { + return 0, 0, err + } + tgMsgId, err := strconv.ParseInt(string(parts[1]), 10, 64) + return tgChatId, tgMsgId, err +} + +// Gc compacts the value log +func (db *IdsDB) Gc() { + db.db.RunValueLogGC(0.7) +} + +// Close closes a DB +func (db *IdsDB) Close() { + db.db.Close() +} diff --git a/badger/ids_test.go b/badger/ids_test.go new file mode 100644 index 0000000..efafdeb --- /dev/null +++ b/badger/ids_test.go @@ -0,0 +1,72 @@ +package badger + +import ( + "reflect" + "testing" +) + +func TestToKeyPrefix(t *testing.T) { + if !reflect.DeepEqual(toKeyPrefix("+123456789", "test@example.com"), []byte("+123456789/test@example.com/")) { + t.Error("Wrong prefix") + } +} + +func TestToByteKey(t *testing.T) { + if !reflect.DeepEqual(toByteKey([]byte("ababa/galamaga/"), []byte("123"), "ppp"), []byte("ababa/galamaga/ppp/123")) { + t.Error("Wrong key") + } +} + +func TestToTgByteString(t *testing.T) { + if !reflect.DeepEqual(toTgByteString(-2345, 6789), []byte("-2345/6789")) { + t.Error("Wrong tg string") + } +} + +func TestToXmppByteString(t *testing.T) { + if !reflect.DeepEqual(toXmppByteString("aboba"), []byte("aboba")) { + t.Error("Wrong xmpp string") + } +} + +func TestSplitTgByteStringUnparsable(t *testing.T) { + _, _, err := splitTgByteString([]byte("@#U*&$(@#")) + if err == nil { + t.Error("Unparsable should not be parsed") + return + } + if err.Error() != "Couldn't parse tg id pair" { + t.Error("Wrong parse error") + } +} + +func TestSplitTgByteManyParts(t *testing.T) { + _, _, err := splitTgByteString([]byte("a/b/c/d")) + if err == nil { + t.Error("Should not parse many parts") + return + } + if err.Error() != "Couldn't parse tg id pair" { + t.Error("Wrong parse error") + } +} + +func TestSplitTgByteNonNumeric(t *testing.T) { + _, _, err := splitTgByteString([]byte("0/a")) + if err == nil { + t.Error("Should not parse non-numeric msgid") + } +} + +func TestSplitTgByteSuccess(t *testing.T) { + chatId, msgId, err := splitTgByteString([]byte("-198282398/23798478")) + if err != nil { + t.Error("Should be parsed well") + } + if chatId != -198282398 { + t.Error("Wrong chatId") + } + if msgId != 23798478 { + t.Error("Wrong msgId") + } +} -- cgit v1.2.3 From 7215d11d7973b9896c6223938649c75165fa3ae7 Mon Sep 17 00:00:00 2001 From: Bohdan Horbeshko Date: Mon, 5 Jun 2023 04:22:13 -0400 Subject: XEP-0308 message editing --- badger/ids.go | 98 +++++++++++++++++++++++++++++++++++++++++++ telegram/handlers.go | 5 +++ telegram/utils.go | 39 +++++++++-------- xmpp/extensions/extensions.go | 17 ++++++++ xmpp/handlers.go | 43 ++++++++++++++++++- 5 files changed, 184 insertions(+), 18 deletions(-) (limited to 'badger') diff --git a/badger/ids.go b/badger/ids.go index 80fb9ad..079887a 100644 --- a/badger/ids.go +++ b/badger/ids.go @@ -121,6 +121,104 @@ func splitTgByteString(val []byte) (int64, int64, error) { return tgChatId, tgMsgId, err } +// ReplaceIdPair replaces an old entry by XMPP ID with both new XMPP and Tg ID +func (db *IdsDB) ReplaceIdPair(tgAccount, xmppAccount, oldXmppId, newXmppId string, newMsgId int64) error { + // read old pair + chatId, oldMsgId, err := db.GetByXmppId(tgAccount, xmppAccount, oldXmppId) + if err != nil { + return err + } + + bPrefix := toKeyPrefix(tgAccount, xmppAccount) + + bOldTgId := toTgByteString(chatId, oldMsgId) + bOldXmppId := toXmppByteString(oldXmppId) + bOldTgKey := toByteKey(bPrefix, bOldTgId, "tg") + bOldXmppKey := toByteKey(bPrefix, bOldXmppId, "xmpp") + + bTgId := toTgByteString(chatId, newMsgId) + bXmppId := toXmppByteString(newXmppId) + bTgKey := toByteKey(bPrefix, bTgId, "tg") + bXmppKey := toByteKey(bPrefix, bXmppId, "xmpp") + + return db.db.Update(func(txn *badger.Txn) error { + // save new pair + if err := txn.Set(bTgKey, bXmppId); err != nil { + return err + } + if err := txn.Set(bXmppKey, bTgId); err != nil { + return err + } + // delete old pair + if err := txn.Delete(bOldTgKey); err != nil { + return err + } + return txn.Delete(bOldXmppKey) + }) +} + +// ReplaceXmppId replaces an old XMPP ID with new XMPP ID and keeps Tg ID intact +func (db *IdsDB) ReplaceXmppId(tgAccount, xmppAccount, oldXmppId, newXmppId string) error { + // read old Tg IDs + chatId, msgId, err := db.GetByXmppId(tgAccount, xmppAccount, oldXmppId) + if err != nil { + return err + } + + bPrefix := toKeyPrefix(tgAccount, xmppAccount) + + bOldXmppId := toXmppByteString(oldXmppId) + bOldXmppKey := toByteKey(bPrefix, bOldXmppId, "xmpp") + + bTgId := toTgByteString(chatId, msgId) + bXmppId := toXmppByteString(newXmppId) + bTgKey := toByteKey(bPrefix, bTgId, "tg") + bXmppKey := toByteKey(bPrefix, bXmppId, "xmpp") + + return db.db.Update(func(txn *badger.Txn) error { + // save new pair + if err := txn.Set(bTgKey, bXmppId); err != nil { + return err + } + if err := txn.Set(bXmppKey, bTgId); err != nil { + return err + } + // delete old xmpp->tg entry + return txn.Delete(bOldXmppKey) + }) +} + +// ReplaceTgId replaces an old Tg ID with new Tg ID and keeps Tg chat ID and XMPP ID intact +func (db *IdsDB) ReplaceTgId(tgAccount, xmppAccount string, chatId, oldMsgId, newMsgId int64) error { + // read old XMPP ID + xmppId, err := db.GetByTgIds(tgAccount, xmppAccount, chatId, oldMsgId) + if err != nil { + return err + } + + bPrefix := toKeyPrefix(tgAccount, xmppAccount) + + bOldTgId := toTgByteString(chatId, oldMsgId) + bOldTgKey := toByteKey(bPrefix, bOldTgId, "tg") + + bTgId := toTgByteString(chatId, newMsgId) + bXmppId := toXmppByteString(xmppId) + bTgKey := toByteKey(bPrefix, bTgId, "tg") + bXmppKey := toByteKey(bPrefix, bXmppId, "xmpp") + + return db.db.Update(func(txn *badger.Txn) error { + // save new pair + if err := txn.Set(bTgKey, bXmppId); err != nil { + return err + } + if err := txn.Set(bXmppKey, bTgId); err != nil { + return err + } + // delete old tg->xmpp entry + return txn.Delete(bOldTgKey) + }) +} + // Gc compacts the value log func (db *IdsDB) Gc() { db.db.RunValueLogGC(0.7) diff --git a/telegram/handlers.go b/telegram/handlers.go index bd768ae..6de59e7 100644 --- a/telegram/handlers.go +++ b/telegram/handlers.go @@ -272,6 +272,11 @@ func (c *Client) updateAuthorizationState(update *client.UpdateAuthorizationStat // clean uploaded files func (c *Client) updateMessageSendSucceeded(update *client.UpdateMessageSendSucceeded) { + log.Debugf("replace message %v with %v", update.OldMessageId, update.Message.Id) + if err := gateway.IdsDB.ReplaceTgId(c.Session.Login, c.jid, update.Message.ChatId, update.OldMessageId, update.Message.Id); err != nil { + log.Error("failed to replace %v with %v: %v", update.OldMessageId, update.Message.Id, err.Error()) + } + file, _ := c.contentToFile(update.Message.Content) if file != nil && file.Local != nil { c.cleanTempFile(file.Local.Path) diff --git a/telegram/utils.go b/telegram/utils.go index dd63248..ecce6da 100644 --- a/telegram/utils.go +++ b/telegram/utils.go @@ -991,14 +991,14 @@ func (c *Client) PrepareOutgoingMessageContent(text string) client.InputMessageC return c.prepareOutgoingMessageContent(text, nil) } -// ProcessOutgoingMessage executes commands or sends messages to mapped chats -func (c *Client) ProcessOutgoingMessage(chatID int64, text string, returnJid string, id string, replyId int64) { +// ProcessOutgoingMessage executes commands or sends messages to mapped chats, returns message id +func (c *Client) ProcessOutgoingMessage(chatID int64, text string, returnJid string, replyId int64, replaceId int64) int64 { if !c.Online() { // we're offline - return + return 0 } - if strings.HasPrefix(text, "/") || strings.HasPrefix(text, "!") { + if replaceId == 0 && (strings.HasPrefix(text, "/") || strings.HasPrefix(text, "!")) { // try to execute commands response, isCommand := c.ProcessChatCommand(chatID, text) if response != "" { @@ -1006,7 +1006,7 @@ func (c *Client) ProcessOutgoingMessage(chatID int64, text string, returnJid str } // do not send on success if isCommand { - return + return 0 } } @@ -1014,7 +1014,7 @@ func (c *Client) ProcessOutgoingMessage(chatID int64, text string, returnJid str // quotations var reply int64 - if replyId == 0 { + if replaceId == 0 && replyId == 0 { replySlice := replyRegex.FindStringSubmatch(text) if len(replySlice) > 1 { reply, _ = strconv.ParseInt(replySlice[1], 10, 64) @@ -1069,24 +1069,29 @@ func (c *Client) ProcessOutgoingMessage(chatID int64, text string, returnJid str content := c.prepareOutgoingMessageContent(text, file) + if replaceId != 0 { + tgMessage, err := c.client.EditMessageText(&client.EditMessageTextRequest{ + ChatId: chatID, + MessageId: replaceId, + InputMessageContent: content, + }) + if err != nil { + c.returnError(returnJid, chatID, "Not edited", err) + return 0 + } + return tgMessage.Id + } + tgMessage, err := c.client.SendMessage(&client.SendMessageRequest{ ChatId: chatID, ReplyToMessageId: reply, InputMessageContent: content, }) if err != nil { - gateway.SendTextMessage( - returnJid, - strconv.FormatInt(chatID, 10), - fmt.Sprintf("Not sent: %s", err.Error()), - c.xmpp, - ) - } else { - err = gateway.IdsDB.Set(c.Session.Login, c.jid, tgMessage.ChatId, tgMessage.Id, id) - if err != nil { - log.Errorf("Failed to save ids %v/%v %v", tgMessage.ChatId, tgMessage.Id, id) - } + c.returnError(returnJid, chatID, "Not sent", err) + return 0 } + return tgMessage.Id } func (c *Client) returnMessage(returnJid string, chatID int64, text string) { diff --git a/xmpp/extensions/extensions.go b/xmpp/extensions/extensions.go index 78de47d..2d547af 100644 --- a/xmpp/extensions/extensions.go +++ b/xmpp/extensions/extensions.go @@ -180,6 +180,12 @@ type ClientMessage struct { Extensions []stanza.MsgExtension `xml:",omitempty"` } +// Replace is from XEP-0308 +type Replace struct { + XMLName xml.Name `xml:"urn:xmpp:message-correct:0 replace"` + Id string `xml:"id,attr"` +} + // Namespace is a namespace! func (c PresenceNickExtension) Namespace() string { return c.XMLName.Space @@ -225,6 +231,11 @@ func (c ComponentPrivilege) Namespace() string { return c.XMLName.Space } +// Namespace is a namespace! +func (c Replace) Namespace() string { + return c.XMLName.Space +} + // Name is a packet name func (ClientMessage) Name() string { return "message" @@ -291,4 +302,10 @@ func init() { "urn:xmpp:privilege:1", "privilege", }, ComponentPrivilege{}) + + // message edit + stanza.TypeRegistry.MapExtension(stanza.PKTMessage, xml.Name{ + "urn:xmpp:message-correct:0", + "replace", + }, Replace{}) } diff --git a/xmpp/handlers.go b/xmpp/handlers.go index 5178acd..51fd831 100644 --- a/xmpp/handlers.go +++ b/xmpp/handlers.go @@ -102,10 +102,13 @@ func HandleMessage(s xmpp.Sender, p stanza.Packet) { if ok { var reply extensions.Reply var fallback extensions.Fallback + var replace extensions.Replace msg.Get(&reply) msg.Get(&fallback) + msg.Get(&replace) log.Debugf("reply: %#v", reply) log.Debugf("fallback: %#v", fallback) + log.Debugf("replace: %#v", replace) var replyId int64 var err error @@ -138,8 +141,46 @@ func HandleMessage(s xmpp.Sender, p stanza.Packet) { text = text[:start] + text[end:] } } + var replaceId int64 + if replace.Id != "" { + chatId, msgId, err := gateway.IdsDB.GetByXmppId(session.Session.Login, bare, replace.Id) + if err == nil { + if chatId != toID { + gateway.SendTextMessage(msg.From, strconv.FormatInt(toID, 10), "", component) + return + } + replaceId = msgId + log.Debugf("replace tg: %#v %#v", chatId, msgId) + } else { + gateway.SendTextMessage(msg.From, strconv.FormatInt(toID, 10), "", component) + return + } + } - session.ProcessOutgoingMessage(toID, text, msg.From, msg.Id, replyId) + tgMessageId := session.ProcessOutgoingMessage(toID, text, msg.From, replyId, replaceId) + if tgMessageId != 0 { + if replaceId != 0 { + // not needed (is it persistent among clients though?) + /* err = gateway.IdsDB.ReplaceIdPair(session.Session.Login, bare, replace.Id, msg.Id, tgMessageId) + if err != nil { + log.Errorf("Failed to replace id %v with %v %v", replace.Id, msg.Id, tgMessageId) + } */ + } else { + err = gateway.IdsDB.Set(session.Session.Login, bare, toID, tgMessageId, msg.Id) + if err != nil { + log.Errorf("Failed to save ids %v/%v %v", toID, tgMessageId, msg.Id) + } + } + } else { + /* + // if a message failed to edit on Telegram side, match new XMPP ID with old Telegram ID anyway + if replaceId != 0 { + err = gateway.IdsDB.ReplaceXmppId(session.Session.Login, bare, replace.Id, msg.Id) + if err != nil { + log.Errorf("Failed to replace id %v with %v", replace.Id, msg.Id) + } + } */ + } return } else { toJid, err := stanza.NewJid(msg.To) -- cgit v1.2.3 From 00f1417cb2ed8199633a12989e9befb53298d407 Mon Sep 17 00:00:00 2001 From: Bohdan Horbeshko Date: Thu, 8 Jun 2023 13:33:22 -0400 Subject: Version 1.6.0 --- badger/ids.go | 2 +- telegabber.go | 2 +- telegram/commands.go | 2 +- telegram/handlers.go | 2 +- telegram/utils.go | 12 ++++++------ xmpp/handlers.go | 16 ++++++++-------- 6 files changed, 18 insertions(+), 18 deletions(-) (limited to 'badger') diff --git a/badger/ids.go b/badger/ids.go index 079887a..1295e8a 100644 --- a/badger/ids.go +++ b/badger/ids.go @@ -92,7 +92,7 @@ func toKeyPrefix(tgAccount, xmppAccount string) []byte { } func toByteKey(prefix, suffix []byte, typ string) []byte { - key := make([]byte, 0, len(prefix) + len(suffix) + 6) + key := make([]byte, 0, len(prefix)+len(suffix)+6) key = append(key, prefix...) key = append(key, []byte(typ)...) key = append(key, []byte("/")...) diff --git a/telegabber.go b/telegabber.go index d133d80..48db544 100644 --- a/telegabber.go +++ b/telegabber.go @@ -15,7 +15,7 @@ import ( goxmpp "gosrc.io/xmpp" ) -var version string = "1.6.0-dev" +var version string = "1.6.0" var commit string var sm *goxmpp.StreamManager diff --git a/telegram/commands.go b/telegram/commands.go index f36108a..5e16a6a 100644 --- a/telegram/commands.go +++ b/telegram/commands.go @@ -652,7 +652,7 @@ func (c *Client) ProcessChatCommand(chatID int64, cmdline string) (string, bool) keyValueString("Chat title", info.Fn), keyValueString("Photo", link), keyValueString("Username", info.Nickname), - keyValueString("Full name", info.Given + " " + info.Family), + keyValueString("Full name", info.Given+" "+info.Family), keyValueString("Phone number", info.Tel), } return strings.Join(entries, "\n"), true diff --git a/telegram/handlers.go b/telegram/handlers.go index 6de59e7..57402ab 100644 --- a/telegram/handlers.go +++ b/telegram/handlers.go @@ -274,7 +274,7 @@ func (c *Client) updateAuthorizationState(update *client.UpdateAuthorizationStat func (c *Client) updateMessageSendSucceeded(update *client.UpdateMessageSendSucceeded) { log.Debugf("replace message %v with %v", update.OldMessageId, update.Message.Id) if err := gateway.IdsDB.ReplaceTgId(c.Session.Login, c.jid, update.Message.ChatId, update.OldMessageId, update.Message.Id); err != nil { - log.Error("failed to replace %v with %v: %v", update.OldMessageId, update.Message.Id, err.Error()) + log.Errorf("failed to replace %v with %v: %v", update.OldMessageId, update.Message.Id, err.Error()) } file, _ := c.contentToFile(update.Message.Content) diff --git a/telegram/utils.go b/telegram/utils.go index a4a2bd4..58f7712 100644 --- a/telegram/utils.go +++ b/telegram/utils.go @@ -25,13 +25,13 @@ import ( ) type VCardInfo struct { - Fn string - Photo *client.File + Fn string + Photo *client.File Nickname string - Given string - Family string - Tel string - Info string + Given string + Family string + Tel string + Info string } var errOffline = errors.New("TDlib instance is offline") diff --git a/xmpp/handlers.go b/xmpp/handlers.go index 5034551..d5666b1 100644 --- a/xmpp/handlers.go +++ b/xmpp/handlers.go @@ -183,13 +183,13 @@ func HandleMessage(s xmpp.Sender, p stanza.Packet) { } } else { /* - // if a message failed to edit on Telegram side, match new XMPP ID with old Telegram ID anyway - if replaceId != 0 { - err = gateway.IdsDB.ReplaceXmppId(session.Session.Login, bare, replace.Id, msg.Id) - if err != nil { - log.Errorf("Failed to replace id %v with %v", replace.Id, msg.Id) - } - } */ + // if a message failed to edit on Telegram side, match new XMPP ID with old Telegram ID anyway + if replaceId != 0 { + err = gateway.IdsDB.ReplaceXmppId(session.Session.Login, bare, replace.Id, msg.Id) + if err != nil { + log.Errorf("Failed to replace id %v with %v", replace.Id, msg.Id) + } + } */ } return } else { @@ -225,7 +225,7 @@ func HandleMessage(s xmpp.Sender, p stanza.Packet) { suffix := "@" + msg.From for bare := range sessions { if strings.HasSuffix(bare, suffix) { - gateway.SendServiceMessage(bare, "Your server \"" + msg.From + "\" does not allow to send carbons", component) + gateway.SendServiceMessage(bare, "Your server \""+msg.From+"\" does not allow to send carbons", component) } } } -- cgit v1.2.3