aboutsummaryrefslogblamecommitdiff
path: root/xmpp/component.go
blob: fb5732e1b5a01f49c087f8f12281ddf6b1d7b2b6 (plain) (tree)
1
2
3
4
5
6
7
8
9


            
                               
              
              
 
                                                    
                                                         
                                                      
                                                          
 
                                        
                       
                              

 
                                
                                        
                                  
                          
 

                                                      
                                                                                                                   
                     
 
                                                  
                       
                                    



                   
                                         



                                                                    





                                       

                                                     

                                                   


                                                                              
                       
                                    

         
                                   




                                              


                                                                    
 
                                 
 
 

                                           
                                            
 
                          
                                   


                                                                     
                 
         
                            
 

                                            
                                
             
                                
                                                          
                                                                        
                                       
                                                                
                                
                                                        
                                                          
                                                          

                         






                                                                          
         

 
                                                                   

                     
                                                    







                                                            



                                                                                         







                                         

                                                                                                                             

                                    
                                                                                       



                                                                                   






                                                                  
                                       
                                    



                            
 












                                                                



                                                                      
                          
                             
                                          
                                            
         
                            
 
                        
                      
 
                       

                              
package xmpp

import (
	"github.com/pkg/errors"
	"sync"
	"time"

	"dev.narayana.im/narayana/telegabber/config"
	"dev.narayana.im/narayana/telegabber/persistence"
	"dev.narayana.im/narayana/telegabber/telegram"
	"dev.narayana.im/narayana/telegabber/xmpp/gateway"

	log "github.com/sirupsen/logrus"
	"gosrc.io/xmpp"
	"gosrc.io/xmpp/stanza"
)

var tgConf config.TelegramConfig
var sessions map[string]*telegram.Client
var db *persistence.SessionsYamlDB
var sessionLock sync.Mutex

// NewComponent starts a new component and wraps it in
// a stream manager that you should start yourself
func NewComponent(conf config.XMPPConfig, tc config.TelegramConfig) (*xmpp.StreamManager, *xmpp.Component, error) {
	var err error

	gateway.Jid, err = stanza.NewJid(conf.Jid)
	if err != nil {
		return nil, nil, err
	}

	tgConf = tc

	options := xmpp.ComponentOptions{
		TransportConfiguration: xmpp.TransportConfiguration{
			Address: conf.Host + ":" + conf.Port,
			Domain:  conf.Jid,
		},
		Domain:  conf.Jid,
		Secret:  conf.Password,
		Name:    "telegabber",
	}

	router := xmpp.NewRouter()
	router.HandleFunc("iq", HandleIq)
	router.HandleFunc("presence", HandlePresence)
	router.HandleFunc("message", HandleMessage)

	component, err := xmpp.NewComponent(options, router, func(err error) {
		log.Error(err)
	})
	if err != nil {
		return nil, nil, err
	}

	// probe all known sessions
	err = loadSessions(conf.Db, component)
	if err != nil {
		return nil, nil, err
	}

	sm := xmpp.NewStreamManager(component, func(s xmpp.Sender) {
		go heartbeat(component)
	})

	return sm, component, nil
}

func heartbeat(component *xmpp.Component) {
	var err error
	probeType := gateway.SPType("probe")

	sessionLock.Lock()
	for jid := range sessions {
		err = gateway.SendPresence(component, jid, probeType)
		if err != nil {
			log.Error(err)
		}
	}
	sessionLock.Unlock()

	log.Info("Starting heartbeat queue")

	// status updater thread
	for {
		time.Sleep(60e9)
		for key, presence := range gateway.Queue {
			err = gateway.ResumableSend(component, presence)
			if err != nil {
				gateway.LogBadPresence(presence)
			} else {
				gateway.QueueLock.Lock()
				delete(gateway.Queue, key)
				gateway.QueueLock.Unlock()
			}
		}

		if gateway.DirtySessions {
			gateway.DirtySessions = false
			// no problem if a dirty flag gets set again here,
			// it would be resolved on the next iteration
			SaveSessions()
		}
	}
}

func loadSessions(dbPath string, component *xmpp.Component) error {
	var err error

	sessions = make(map[string]*telegram.Client)

	db, err = persistence.LoadSessions(dbPath)
	if err != nil {
		return err
	}

	db.Transaction(func() bool {
		for jid, session := range db.Data.Sessions {
			// copy the session struct, otherwise all of them would reference
			// the same temporary range variable
			currentSession := session
			getTelegramInstance(jid, &currentSession, component)
		}

		return false
	}, persistence.SessionMarshaller)

	return nil
}

func getTelegramInstance(jid string, savedSession *persistence.Session, component *xmpp.Component) (*telegram.Client, bool) {
	var err error
	session, ok := sessions[jid]
	if !ok {
		session, err = telegram.NewClient(tgConf, jid, component, savedSession)
		if err != nil {
			log.Error(errors.Wrap(err, "TDlib initialization failure"))
			return session, false
		}
		if savedSession.KeepOnline {
			if err = session.Connect(""); err != nil {
				log.Error(err)
				return session, false
			}
		}
		sessionLock.Lock()
		sessions[jid] = session
		sessionLock.Unlock()
	}

	return session, true
}

// SaveSessions dumps current sessions to the file
func SaveSessions() {
	sessionLock.Lock()
	defer sessionLock.Unlock()
	db.Transaction(func() bool {
		for jid, session := range sessions {
			db.Data.Sessions[jid] = *session.Session
		}

		return true
	}, persistence.SessionMarshaller)
}

// Close gracefully terminates the component and saves active sessions
func Close(component *xmpp.Component) {
	log.Error("Disconnecting...")

	sessionLock.Lock()
	// close all sessions
	for _, session := range sessions {
		session.Disconnect("", true)
	}
	sessionLock.Unlock()

	// save sessions
	SaveSessions()

	// close stream
	component.Disconnect()
}