diff options
| author | 2026-03-14 00:44:19 +0300 | |
|---|---|---|
| committer | 2026-03-14 00:44:19 +0300 | |
| commit | e5d6f4c02b757c83244ba5e04fead08623a27299 (patch) | |
| tree | 5b5babb9887cafa3dbc165928dc2b0fd65265bda /internal | |
| download | pose-master.tar.gz pose-master.tar.bz2 pose-master.tar.xz pose-master.zip | |
Diffstat (limited to '')
| -rw-r--r-- | internal/application/application.go | 148 | ||||
| -rw-r--r-- | internal/database/db.go | 54 | ||||
| -rw-r--r-- | internal/model/item.go | 24 | ||||
| -rw-r--r-- | internal/source/feed.go | 95 | ||||
| -rw-r--r-- | internal/target/html.go | 14 | ||||
| -rw-r--r-- | internal/target/telegram.go | 112 |
6 files changed, 447 insertions, 0 deletions
diff --git a/internal/application/application.go b/internal/application/application.go new file mode 100644 index 0000000..d61f38c --- /dev/null +++ b/internal/application/application.go @@ -0,0 +1,148 @@ +package application + +import ( + "context" + "fmt" + "log/slog" + "os" + "sync" + + cm "go.neonxp.ru/conf/model" + "go.neonxp.ru/pose/internal/database" + "go.neonxp.ru/pose/internal/model" + "go.neonxp.ru/pose/internal/source" + "go.neonxp.ru/pose/internal/target" +) + +type Application struct { + logger *slog.Logger + db *database.DB + sources []Source + targets []Target + wg sync.WaitGroup +} + +func New(cfg cm.Group, logger *slog.Logger) (*Application, error) { + dbfile := cfg.Get("db_file").StringExt("", os.LookupEnv) + db, err := database.New(dbfile) + if err != nil { + return nil, fmt.Errorf("failed create db: %w", err) + } + + app := &Application{ + logger: logger, + db: db, + sources: []Source{}, + targets: []Target{}, + wg: sync.WaitGroup{}, + } + + //Build sources + for _, it := range cfg.Get("sources").Group() { + name := it.String() + switch it.Name { + case cm.Ident("feed"): + src, err := source.NewFeed( + it.Group(), + app.logger.With(slog.String("source", name)), + ) + if err != nil { + return nil, err + } + app.sources = append(app.sources, src) + default: + return nil, fmt.Errorf("unknown source type: %s", it.Name) + } + } + + // Build targets + for _, it := range cfg.Get("targets").Group() { + name := it.String() + switch it.Name { + case cm.Ident("telegram"): + tgt, err := target.NewTelegram( + it.Group(), + app.logger.With(slog.String("target", name)), + ) + if err != nil { + return nil, err + } + app.targets = append(app.targets, tgt) + default: + return nil, fmt.Errorf("unknown target type: %s", it.Name) + } + } + + return app, nil +} + +func (a *Application) Run(ctx context.Context) error { + defer a.wg.Wait() + + // Сборный канал со всех источников + inChan := a.inChan(ctx) + + // Срез каналов всех потребителей + outChans := a.outChans(ctx) + + // Перекладывание сообщений из inChan во все outChans + for it := range inChan { + for _, ch := range outChans { + select { + case ch <- it: + slog.InfoContext(ctx, "trying send feed item to target", slog.String("id", it.ID)) + default: + slog.WarnContext(ctx, "feed item ignored because no free targets") + } + } + } + + return nil +} + +func (a *Application) Close() error { + return a.db.Close() +} + +func (a *Application) inChan(ctx context.Context) chan model.Item { + inChan := make(chan model.Item, 1) + a.wg.Go(func() { + <-ctx.Done() + close(inChan) + }) + + for _, s := range a.sources { + a.wg.Go(func() { + for it := range s.Retrive(ctx) { + if a.db.Exists(it.ID) { + continue + } + if err := a.db.Append(it.ID); err != nil { + a.logger.ErrorContext(ctx, "failed to add item to deduplication db", slog.Any("err", err)) + continue + } + + inChan <- it + } + }) + } + + return inChan +} + +func (a *Application) outChans(ctx context.Context) []chan<- model.Item { + outChans := make([]chan<- model.Item, len(a.targets)) + + for i, t := range a.targets { + outChans[i] = t.Send(ctx) + } + return outChans +} + +type Source interface { + Retrive(ctx context.Context) <-chan model.Item +} + +type Target interface { + Send(ctx context.Context) chan<- model.Item +} diff --git a/internal/database/db.go b/internal/database/db.go new file mode 100644 index 0000000..2fbfe74 --- /dev/null +++ b/internal/database/db.go @@ -0,0 +1,54 @@ +// Package database простейшая БД которая позволяет только два действия: +// +// 1. сохранить некую строку - `Append(string) error` +// 2. ответить на вопрос сохранялась ли уже когда либо эта строка - +// `Exists(string) bool` +// +// Нужна для дедупликации публикуемых записей +package database + +import ( + "bytes" + "fmt" + "os" +) + +type DB struct { + fp *os.File + cache map[string]struct{} +} + +func New(path string) (*DB, error) { + cache, _ := os.ReadFile(path) // Ignore if no DB file + + fp, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + return nil, fmt.Errorf("failed open db file for write: %w", err) + } + db := &DB{ + fp: fp, + cache: map[string]struct{}{}, + } + for part := range bytes.SplitSeq(cache, []byte("\n")) { + db.cache[string(part)] = struct{}{} + } + + return db, nil +} + +func (d *DB) Close() error { + return d.fp.Close() +} + +func (d *DB) Exists(s string) bool { + _, ok := d.cache[s] + return ok +} + +func (d *DB) Append(s string) error { + if d.Exists(s) { + return nil + } + _, err := fmt.Fprintln(d.fp, s) + return err +} diff --git a/internal/model/item.go b/internal/model/item.go new file mode 100644 index 0000000..c18d830 --- /dev/null +++ b/internal/model/item.go @@ -0,0 +1,24 @@ +package model + +import ( + "fmt" + "time" +) + +type Item struct { + ID string + Date time.Time + Title string + Summary string + Link string + Img string +} + +func (it *Item) BuildMessage() string { + return fmt.Sprintf( + `<b>%s</b><br /><br />%s<br />Читать дальше: <a href="%[3]s">%[3]s</a>`, + it.Title, + it.Summary, + it.Link, + ) +} diff --git a/internal/source/feed.go b/internal/source/feed.go new file mode 100644 index 0000000..31f8535 --- /dev/null +++ b/internal/source/feed.go @@ -0,0 +1,95 @@ +package source + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/mmcdole/gofeed" + cm "go.neonxp.ru/conf/model" + "go.neonxp.ru/pose/internal/model" +) + +var ( + ErrNoFeedAddress = errors.New("no feed address") +) + +type Feed struct { + logger *slog.Logger + feed string + scrapeInterval time.Duration + feedParser *gofeed.Parser +} + +func NewFeed(cfg cm.Group, logger *slog.Logger) (*Feed, error) { + feedSource := cfg.Get("url").String() + if feedSource == "" { + return nil, ErrNoFeedAddress + } + durStr := cfg.Get("scrape_interval").String() + if durStr == "" { + durStr = "10m" + } + dur, err := time.ParseDuration(durStr) + if err != nil { + return nil, fmt.Errorf("failed parse duration at source: %w", err) + } + return &Feed{ + feed: feedSource, + scrapeInterval: dur, + feedParser: gofeed.NewParser(), + }, nil +} + +func (a *Feed) Retrive(ctx context.Context) <-chan model.Item { + out := make(chan model.Item) + ticker := time.NewTicker(a.scrapeInterval) + go func() { + <-ctx.Done() + ticker.Stop() + }() + go func() { + defer close(out) + for _, item := range a.scrapeFeed(ctx) { + out <- item + } + for range ticker.C { + for _, item := range a.scrapeFeed(ctx) { + out <- item + } + } + }() + + return out +} + +func (a *Feed) scrapeFeed(ctx context.Context) []model.Item { + feed, err := a.feedParser.ParseURLWithContext(a.feed, ctx) + if err != nil { + a.logger.Error("failed parse feed", slog.Any("err", err)) + return nil + } + result := make([]model.Item, 0, len(feed.Items)) + for _, it := range feed.Items { + date := time.Now() + if it.PublishedParsed != nil { + date = *it.PublishedParsed + } + image := "" + if it.Image != nil { + image = it.Image.URL + } + result = append(result, model.Item{ + ID: it.GUID, + Date: date, + Title: it.Title, + Summary: it.Description, + Link: it.Link, + Img: image, + }) + } + + return result +} diff --git a/internal/target/html.go b/internal/target/html.go new file mode 100644 index 0000000..84d8662 --- /dev/null +++ b/internal/target/html.go @@ -0,0 +1,14 @@ +package target + +import "strings" + +func processHTML(html string) string { + html = strings.ReplaceAll(html, "\n", " ") + html = strings.ReplaceAll(html, "<br>", "\n") + html = strings.ReplaceAll(html, "<br />", "\n") + html = strings.ReplaceAll(html, "<br/>", "\n") + html = strings.ReplaceAll(html, "<p>", "") + html = strings.ReplaceAll(html, "</p>", "\n") + + return html +} diff --git a/internal/target/telegram.go b/internal/target/telegram.go new file mode 100644 index 0000000..f4d4aff --- /dev/null +++ b/internal/target/telegram.go @@ -0,0 +1,112 @@ +package target + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "os" + "time" + + "github.com/microcosm-cc/bluemonday" + cm "go.neonxp.ru/conf/model" + "go.neonxp.ru/pose/internal/model" +) + +var ( + ErrNoToken = errors.New("no api token") + ErrNoGroup = errors.New("no group") +) + +const telegramRequestTimeout = 30 * time.Second +const telegramMaxItemsChan = 32 + +type Telegram struct { + logger *slog.Logger + apiToken string + group string + client *http.Client + policy *bluemonday.Policy +} + +func NewTelegram(cfg cm.Group, logger *slog.Logger) (*Telegram, error) { + token := cfg.Get("token").StringExt("", os.LookupEnv) + if token == "" { + return nil, ErrNoToken + } + group := cfg.Get("group").StringExt("", os.LookupEnv) + if group == "" { + return nil, ErrNoGroup + } + + pol := bluemonday.NewPolicy() + + pol.AllowAttrs("href").OnElements("a") + pol.AllowAttrs("class").OnElements("span") + pol.AllowElements("p", "br", "b", "strong", "i", "em", "u", "ins", "s", "strike", "del", "code", "pre", "blockquote") + + return &Telegram{ + logger: logger, + apiToken: token, + group: group, + client: &http.Client{Timeout: telegramRequestTimeout}, + policy: pol, + }, nil +} + +func (t *Telegram) Send(ctx context.Context) chan<- model.Item { + ch := make(chan model.Item, telegramMaxItemsChan) + go func() { + defer close(ch) + for { + select { + case <-ctx.Done(): + return + case item := <-ch: + if err := t.sendMessage(item); err != nil { + t.logger.ErrorContext(ctx, "failed send feed item to telegram", slog.Any("err", err)) + continue + } + t.logger.InfoContext(ctx, "send item to telegram", slog.String("id", item.ID)) + } + } + }() + + return ch +} + +func (t *Telegram) sendMessage(it model.Item) error { + sendMessageURL := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", t.apiToken) + + message := it.BuildMessage() + message = t.policy.Sanitize(message) + message = processHTML(message) + + params := url.Values{} + params.Set("chat_id", t.group) + params.Set("text", message) + params.Set("parse_mode", "HTML") + + resp, err := t.client.PostForm(sendMessageURL, params) + if err != nil { + return err + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + msg, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed read error body: %w", err) + } + return fmt.Errorf( + "invalid status code %d (%s): %s", + resp.StatusCode, + resp.Status, + string(msg), + ) + } + + return nil +} |
