summaryrefslogtreecommitdiff
path: root/pkg/fetcher/fetcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/fetcher/fetcher.go')
-rw-r--r--pkg/fetcher/fetcher.go149
1 files changed, 149 insertions, 0 deletions
diff --git a/pkg/fetcher/fetcher.go b/pkg/fetcher/fetcher.go
new file mode 100644
index 0000000..0a79a70
--- /dev/null
+++ b/pkg/fetcher/fetcher.go
@@ -0,0 +1,149 @@
+package fetcher
+
+import (
+ "context"
+ "encoding/base64"
+ "io"
+ "log"
+ "net/http"
+ "strings"
+ "time"
+
+ "gitrepo.ru/neonxp/idecnode/pkg/config"
+ "gitrepo.ru/neonxp/idecnode/pkg/idec"
+)
+
+type Fetcher struct {
+ idec *idec.IDEC
+ config *config.Config
+ client *http.Client
+}
+
+func New(i *idec.IDEC, cfg *config.Config) *Fetcher {
+ return &Fetcher{
+ idec: i,
+ config: cfg,
+ client: &http.Client{
+ Timeout: 60 * time.Second,
+ },
+ }
+}
+
+func (f *Fetcher) Run(ctx context.Context) error {
+ for _, node := range f.config.Fetch {
+ messagesToDownloads := []string{}
+ log.Println("fetching", node)
+ for _, echoID := range node.Echos {
+ missed, err := f.getMissedEchoMessages(node, echoID)
+ if err != nil {
+ return err
+ }
+ messagesToDownloads = append(messagesToDownloads, missed...)
+ }
+ if err := f.downloadMessages(node, messagesToDownloads); err != nil {
+ return err
+ }
+ }
+ log.Println("finished")
+ return nil
+}
+
+func (f *Fetcher) downloadMessages(node config.Node, messagesToDownloads []string) error {
+ var slice []string
+ for {
+ limit := min(20, len(messagesToDownloads))
+ if limit == 0 {
+ return nil
+ }
+ slice, messagesToDownloads = messagesToDownloads[:limit-1], messagesToDownloads[limit:]
+ if err := f.downloadMessagesChunk(node, slice); err != nil {
+ return err
+ }
+ }
+}
+
+func (f *Fetcher) getMissedEchoMessages(node config.Node, echoID string) ([]string, error) {
+ missed := []string{}
+ messages, err := f.idec.GetMessagesByEcho(echoID, 0, 0)
+ if err != nil {
+ return nil, err
+ }
+
+ messagesIndex := map[string]struct{}{}
+ for _, msgID := range messages {
+ messagesIndex[msgID] = struct{}{}
+ }
+
+ p := formatCommand(node, "u/e", echoID)
+ resp, err := f.client.Get(p)
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+ data, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+ lines := strings.Split(string(data), "\n")
+ for _, line := range lines {
+ if strings.Contains(line, ".") {
+ // echo name
+ continue
+ }
+ if line == "" {
+ continue
+ }
+ if _, exist := messagesIndex[line]; !exist {
+ missed = append(missed, line)
+ }
+ }
+
+ return missed, nil
+}
+
+func (f *Fetcher) downloadMessagesChunk(node config.Node, messages []string) error {
+ p := formatCommand(node, "u/m", messages...)
+
+ resp, err := f.client.Get(p)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ data, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ lines := strings.Split(string(data), "\n")
+
+ for _, line := range lines {
+ if line == "" {
+ continue
+ }
+ p := strings.Split(line, ":")
+
+ rawMessage, err := base64.StdEncoding.DecodeString(p[1])
+ if err != nil {
+ return err
+ }
+ if err := f.idec.SaveBundleMessage(p[0], string(rawMessage)); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func formatCommand(node config.Node, method string, args ...string) string {
+ segments := []string{node.Addr, method}
+ segments = append(segments, args...)
+ p := strings.Join(segments, "/")
+
+ return p
+}
+
+func min(x, y int) int {
+ if x < y {
+ return x
+ }
+ return y
+}