diff options
Diffstat (limited to 'pkg/fetcher/fetcher.go')
-rw-r--r-- | pkg/fetcher/fetcher.go | 149 |
1 files changed, 149 insertions, 0 deletions
diff --git a/pkg/fetcher/fetcher.go b/pkg/fetcher/fetcher.go new file mode 100644 index 0000000..0a79a70 --- /dev/null +++ b/pkg/fetcher/fetcher.go @@ -0,0 +1,149 @@ +package fetcher + +import ( + "context" + "encoding/base64" + "io" + "log" + "net/http" + "strings" + "time" + + "gitrepo.ru/neonxp/idecnode/pkg/config" + "gitrepo.ru/neonxp/idecnode/pkg/idec" +) + +type Fetcher struct { + idec *idec.IDEC + config *config.Config + client *http.Client +} + +func New(i *idec.IDEC, cfg *config.Config) *Fetcher { + return &Fetcher{ + idec: i, + config: cfg, + client: &http.Client{ + Timeout: 60 * time.Second, + }, + } +} + +func (f *Fetcher) Run(ctx context.Context) error { + for _, node := range f.config.Fetch { + messagesToDownloads := []string{} + log.Println("fetching", node) + for _, echoID := range node.Echos { + missed, err := f.getMissedEchoMessages(node, echoID) + if err != nil { + return err + } + messagesToDownloads = append(messagesToDownloads, missed...) + } + if err := f.downloadMessages(node, messagesToDownloads); err != nil { + return err + } + } + log.Println("finished") + return nil +} + +func (f *Fetcher) downloadMessages(node config.Node, messagesToDownloads []string) error { + var slice []string + for { + limit := min(20, len(messagesToDownloads)) + if limit == 0 { + return nil + } + slice, messagesToDownloads = messagesToDownloads[:limit-1], messagesToDownloads[limit:] + if err := f.downloadMessagesChunk(node, slice); err != nil { + return err + } + } +} + +func (f *Fetcher) getMissedEchoMessages(node config.Node, echoID string) ([]string, error) { + missed := []string{} + messages, err := f.idec.GetMessagesByEcho(echoID, 0, 0) + if err != nil { + return nil, err + } + + messagesIndex := map[string]struct{}{} + for _, msgID := range messages { + messagesIndex[msgID] = struct{}{} + } + + p := formatCommand(node, "u/e", echoID) + resp, err := f.client.Get(p) + if err != nil { + return nil, err + } + defer resp.Body.Close() + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + lines := strings.Split(string(data), "\n") + for _, line := range lines { + if strings.Contains(line, ".") { + // echo name + continue + } + if line == "" { + continue + } + if _, exist := messagesIndex[line]; !exist { + missed = append(missed, line) + } + } + + return missed, nil +} + +func (f *Fetcher) downloadMessagesChunk(node config.Node, messages []string) error { + p := formatCommand(node, "u/m", messages...) + + resp, err := f.client.Get(p) + if err != nil { + return err + } + defer resp.Body.Close() + data, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + lines := strings.Split(string(data), "\n") + + for _, line := range lines { + if line == "" { + continue + } + p := strings.Split(line, ":") + + rawMessage, err := base64.StdEncoding.DecodeString(p[1]) + if err != nil { + return err + } + if err := f.idec.SaveBundleMessage(p[0], string(rawMessage)); err != nil { + return err + } + } + + return nil +} + +func formatCommand(node config.Node, method string, args ...string) string { + segments := []string{node.Addr, method} + segments = append(segments, args...) + p := strings.Join(segments, "/") + + return p +} + +func min(x, y int) int { + if x < y { + return x + } + return y +} |