diff options
-rw-r--r-- | telegram/client.go | 15 | ||||
-rw-r--r-- | telegram/commands.go | 4 | ||||
-rw-r--r-- | telegram/connect.go | 48 | ||||
-rw-r--r-- | telegram/utils.go | 34 | ||||
-rw-r--r-- | xmpp/component.go | 2 | ||||
-rw-r--r-- | xmpp/handlers.go | 11 |
6 files changed, 84 insertions, 30 deletions
diff --git a/telegram/client.go b/telegram/client.go index c4c97ff..097354d 100644 --- a/telegram/client.go +++ b/telegram/client.go @@ -42,12 +42,13 @@ type Client struct { options []client.Option me *client.User - xmpp *xmpp.Component - jid string - Session *persistence.Session - content *config.TelegramContentConfig - cache *cache.Cache - online bool + xmpp *xmpp.Component + jid string + Session *persistence.Session + resources map[string]bool + content *config.TelegramContentConfig + cache *cache.Cache + online bool locks clientLocks } @@ -55,6 +56,7 @@ type Client struct { type clientLocks struct { authorizationReady sync.WaitGroup chatMessageLocks map[int64]*sync.Mutex + resourcesLock sync.Mutex } // NewClient instantiates a Telegram App @@ -104,6 +106,7 @@ func NewClient(conf config.TelegramConfig, jid string, component *xmpp.Component xmpp: component, jid: jid, Session: session, + resources: make(map[string]bool), content: &conf.Content, cache: cache.NewCache(), options: options, diff --git a/telegram/commands.go b/telegram/commands.go index 0df64c1..4b0ee13 100644 --- a/telegram/commands.go +++ b/telegram/commands.go @@ -155,7 +155,7 @@ func (c *Client) usernameOrIDToID(username string) (int64, error) { // ProcessTransportCommand executes a command sent directly to the component // and returns a response -func (c *Client) ProcessTransportCommand(cmdline string) string { +func (c *Client) ProcessTransportCommand(cmdline string, resource string) string { cmd, args := parseCommand(cmdline) switch cmd { case "login", "code", "password": @@ -173,7 +173,7 @@ func (c *Client) ProcessTransportCommand(cmdline string) string { if wasSessionLoginEmpty && c.authorizer == nil { go func() { - err := c.Connect() + err := c.Connect(resource) if err != nil { log.Error(errors.Wrap(err, "TDlib connection failure")) } diff --git a/telegram/connect.go b/telegram/connect.go index 7221453..8cee376 100644 --- a/telegram/connect.go +++ b/telegram/connect.go @@ -86,11 +86,12 @@ func (stateHandler *clientAuthorizer) Close() { } // Connect starts TDlib connection -func (c *Client) Connect() error { +func (c *Client) Connect(resource string) error { // avoid conflict if another authorization is pending already c.locks.authorizationReady.Wait() if c.Online() { + c.refresh(resource) return nil } @@ -116,7 +117,6 @@ func (c *Client) Connect() error { } c.client = tdlibClient - c.locks.authorizationReady.Done() // stage 3: if a client is succesfully created, AuthorizationStateReady is already reached log.Warn("Authorization successful!") @@ -130,27 +130,41 @@ func (c *Client) Connect() error { go c.updateHandler() c.online = true + c.locks.authorizationReady.Done() + c.addResource(resource) - _, err = c.client.GetChats(&client.GetChatsRequest{ - OffsetOrder: client.JsonInt64(math.MaxInt64), - Limit: chatsLimit, - }) - if err != nil { - log.Errorf("Could not retrieve chats: %v", err) - } + go func() { + _, err = c.client.GetChats(&client.GetChatsRequest{ + OffsetOrder: client.JsonInt64(math.MaxInt64), + Limit: chatsLimit, + }) + if err != nil { + log.Errorf("Could not retrieve chats: %v", err) + } - gateway.SendPresence(c.xmpp, c.jid, gateway.SPType("subscribe")) - gateway.SendPresence(c.xmpp, c.jid, gateway.SPType("subscribed")) - gateway.SendPresence(c.xmpp, c.jid, gateway.SPStatus("Logged in as: "+c.Session.Login)) + gateway.SendPresence(c.xmpp, c.jid, gateway.SPType("subscribe")) + gateway.SendPresence(c.xmpp, c.jid, gateway.SPType("subscribed")) + gateway.SendPresence(c.xmpp, c.jid, gateway.SPStatus("Logged in as: "+c.Session.Login)) + }() return nil } -// Disconnect drops TDlib connection -func (c *Client) Disconnect() { +// Disconnect drops TDlib connection and +// returns the flag indicating if disconnecting is permitted +func (c *Client) Disconnect(resource string, quit bool) bool { + if !quit { + c.deleteResource(resource) + } + // other resources are still active + if len(c.resources) > 0 && !quit { + log.Infof("Resource %v for account %v has disconnected, %v remaining", resource, c.Session.Login, len(c.resources)) + log.Debugf("Resources: %#v", c.resources) + return false + } // already disconnected if !c.Online() { - return + return false } log.Warn("Disconnecting from Telegram network...") @@ -168,8 +182,10 @@ func (c *Client) Disconnect() { _, err := c.client.Close() if err != nil { log.Errorf("Couldn't close the Telegram instance: %v; %#v", err, c) - c.forceClose() } + c.forceClose() + + return true } func (c *Client) interactor() { diff --git a/telegram/utils.go b/telegram/utils.go index 865c4d4..641f6d6 100644 --- a/telegram/utils.go +++ b/telegram/utils.go @@ -580,3 +580,37 @@ func (c *Client) ProcessOutgoingMessage(chatID int64, text string, messageID int func (c *Client) StatusesRange() chan *cache.Status { return c.cache.StatusesRange() } + +func (c *Client) addResource(resource string) { + if resource == "" { + return + } + c.locks.resourcesLock.Lock() + defer c.locks.resourcesLock.Unlock() + + c.resources[resource] = true +} + +func (c *Client) deleteResource(resource string) { + c.locks.resourcesLock.Lock() + defer c.locks.resourcesLock.Unlock() + + if _, ok := c.resources[resource]; ok { + delete(c.resources, resource) + } +} + +// refresh roster +func (c *Client) refresh(resource string) { + if _, ok := c.resources[resource]; ok { + return + } + + log.Warnf("Refreshing roster for resource %v", resource) + + for _, chat := range c.cache.ChatsKeys() { + c.ProcessStatusUpdate(chat, "", "") + } + + c.addResource(resource) +} diff --git a/xmpp/component.go b/xmpp/component.go index d1d0ddf..109d3d5 100644 --- a/xmpp/component.go +++ b/xmpp/component.go @@ -134,7 +134,7 @@ func Close(component *xmpp.Component) { // close all sessions for _, session := range sessions { - session.Disconnect() + session.Disconnect("", true) } // save sessions diff --git a/xmpp/handlers.go b/xmpp/handlers.go index ee767ad..59d4c62 100644 --- a/xmpp/handlers.go +++ b/xmpp/handlers.go @@ -85,7 +85,7 @@ func HandleMessage(s xmpp.Sender, p stanza.Packet) { }).Error(errors.Wrap(err, "Invalid to JID!")) } else if toID == gateway.Jid.Bare() { if strings.HasPrefix(msg.Body, "/") { - response := session.ProcessTransportCommand(msg.Body) + response := session.ProcessTransportCommand(msg.Body, fromJid.Resource) if response != "" { gateway.SendMessage(msg.From, "", response, component) } @@ -169,17 +169,18 @@ func handlePresence(s xmpp.Sender, p stanza.Presence) { switch p.Type { // destroy session case "unsubscribed", "unsubscribe": - session.Disconnect() - delete(sessions, bareFromJid) + if session.Disconnect(fromJid.Resource, false) { + delete(sessions, bareFromJid) + } // go offline case "unavailable", "error": - session.Disconnect() + session.Disconnect(fromJid.Resource, false) // go online case "probe", "", "online": // due to the weird implementation of go-tdlib wrapper, it won't // return the client instance until successful authorization go func() { - err = session.Connect() + err = session.Connect(fromJid.Resource) if err != nil { log.Error(errors.Wrap(err, "TDlib connection failure")) } else { |