aboutsummaryrefslogblamecommitdiff
path: root/xmpp/component.go
blob: fa1a650d270726008b2e7ccd7977b51cc3bbbb8f (plain) (tree)
1
2
3
4
5
6
7
8
9


            
                      
                               
              
 
                                                    
                                                         
                                                      
 
                                        
                                  
                       
                              

 

                                         

                                
                                       
                                     
                                 
 

                                                      
                                                                                                                   
                     
 

                                        
                                    



                   
                                   
                       
                                    

         







                                                     

                                                     



                                                            
                                    

         
                                                   
 
                               
 
                                 
 
 







                                                            
                                    
 
                                   






                                                                     
         
 

                                            










                                                          



































                                                                                                 



























































                                                                                                
                                                                               






















                                                      


                                                   
                                  
                 


                                                            

                  
 


















                                                                      
package xmpp

import (
	"encoding/xml"
	"github.com/pkg/errors"
	"time"

	"dev.narayana.im/narayana/telegabber/config"
	"dev.narayana.im/narayana/telegabber/persistence"
	"dev.narayana.im/narayana/telegabber/telegram"

	log "github.com/sirupsen/logrus"
	"github.com/soheilhy/args"
	"gosrc.io/xmpp"
	"gosrc.io/xmpp/stanza"
)

const pollingInterval time.Duration = 1e7

var jid *xmpp.Jid
var tgConf config.TelegramConfig
var sessions map[string]telegram.Client
var queue map[string]*stanza.Presence
var db persistence.SessionsYamlDB

// NewComponent starts a new component and wraps it in
// a stream manager that you should start yourself
func NewComponent(conf config.XMPPConfig, tc config.TelegramConfig) (*xmpp.StreamManager, *xmpp.Component, error) {
	var err error

	jid, err = xmpp.NewJid(conf.Jid)
	if err != nil {
		return nil, nil, err
	}

	tgConf = tc

	err = loadSessions(conf.Db)
	if err != nil {
		return nil, nil, err
	}

	options := xmpp.ComponentOptions{
		Address: conf.Host + ":" + conf.Port,
		Domain:  conf.Jid,
		Secret:  conf.Password,
		Name:    "telegabber",
	}

	router := xmpp.NewRouter()
	router.HandleFunc("iq", HandleIq)
	router.HandleFunc("presence", HandlePresence)
	router.HandleFunc("message", HandleMessage)

	component, err := xmpp.NewComponent(options, router)
	if err != nil {
		return nil, nil, err
	}

	sm := xmpp.NewStreamManager(component, nil)

	go heartbeat(component)

	return sm, component, nil
}

func logPresence(err error, presence *stanza.Presence) {
	log.WithFields(log.Fields{
		"presence": *presence,
	}).Error(errors.Wrap(err, "Couldn't send presence"))
}

func heartbeat(component *xmpp.Component) {
	var err error
	probeType := SPType("probe")

	for jid := range sessions {
		for {
			err = sendPresence(component, jid, probeType)
			if err == nil {
				break
			}
			time.Sleep(pollingInterval)
		}
	}

	log.Info("Starting heartbeat queue")

	for {
		for key, presence := range queue {
			err = component.Send(presence)
			if err != nil {
				logPresence(err, presence)
			} else {
				delete(queue, key)
			}
		}
		time.Sleep(60e9)
	}
}

func loadSessions(dbPath string) error {
	var err error

	sessions = make(map[string]telegram.Client)

	db, err = persistence.LoadSessions(dbPath)
	if err != nil {
		return err
	}

	db.Transaction(func() bool {
		for jid, session := range db.Data.Sessions {
			getTelegramInstance(jid, &session)
		}

		return false
	}, persistence.SessionMarshaller)

	return nil
}

func getTelegramInstance(jid string, savedSession *persistence.Session) (telegram.Client, bool) {
	session, ok := sessions[jid]
	if !ok {
		session, err := telegram.NewClient(tgConf, jid, savedSession)
		if err != nil {
			log.Error(errors.Wrap(err, "TDlib initialization failure"))
			return session, false
		}
		sessions[jid] = session
	}

	return session, true
}

// SPFrom is a Telegram user id
var SPFrom = args.NewString()

// SPType is a presence type
var SPType = args.NewString()

// SPShow is a availability status
var SPShow = args.NewString()

// SPStatus is a verbose status
var SPStatus = args.NewString()

// SPNickname is a XEP-0172 nickname
var SPNickname = args.NewString()

// SPPhoto is a XEP-0153 hash of avatar in vCard
var SPPhoto = args.NewString()

// SPImmed skips queueing
var SPImmed = args.NewBool(args.Default(true))

func newPresence(bareJid string, to string, args ...args.V) stanza.Presence {
	var presenceFrom string
	if SPFrom.IsSet(args) {
		presenceFrom = SPFrom.Get(args) + "@" + bareJid
	} else {
		presenceFrom = bareJid
	}

	presence := stanza.Presence{Attrs: stanza.Attrs{
		From: presenceFrom,
		To:   to,
	}}

	if SPType.IsSet(args) {
		presence.Attrs.Type = stanza.StanzaType(SPType.Get(args))
	}
	if SPShow.IsSet(args) {
		presence.Show = stanza.PresenceShow(SPShow.Get(args))
	}
	if SPStatus.IsSet(args) {
		presence.Status = SPStatus.Get(args)
	}
	if SPNickname.IsSet(args) {
		presence.Extensions = append(presence.Extensions, PresenceNickExtension{
			Text: SPNickname.Get(args),
		})
	}
	if SPPhoto.IsSet(args) {
		presence.Extensions = append(presence.Extensions, PresenceXVCardUpdateExtension{
			Photo: PresenceXVCardUpdatePhoto{
				Text: SPPhoto.Get(args),
			},
		})
	}

	return presence
}

func sendPresence(component *xmpp.Component, to string, args ...args.V) error {
	var logFrom string
	bareJid := jid.Bare()
	if SPFrom.IsSet(args) {
		logFrom = SPFrom.Get(args)
	} else {
		logFrom = bareJid
	}

	log.WithFields(log.Fields{
		"type": SPType.Get(args),
		"from": logFrom,
		"to":   to,
	}).Info("Got presence")

	presence := newPresence(bareJid, to, args...)

	// explicit check, as marshalling is expensive
	if log.GetLevel() == log.DebugLevel {
		log.Debug(xml.Marshal(presence))
	}

	immed := SPImmed.Get(args)
	if immed {
		err := component.Send(presence)
		if err != nil {
			logPresence(err, &presence)
			return err
		}
	} else {
		queue[presence.From+presence.To] = &presence
	}

	return nil
}

// Close gracefully terminates the component and saves active sessions
func Close(component *xmpp.Component) {
	log.Error("Disconnecting...")

	for _, session := range sessions {
		session.Disconnect()
	}

	db.Transaction(func() bool {
		for jid, session := range sessions {
			db.Data.Sessions[jid] = *session.Session
		}

		return true
	}, persistence.SessionMarshaller)

	component.Disconnect()
}