From e5d6f4c02b757c83244ba5e04fead08623a27299 Mon Sep 17 00:00:00 2001
From: Alexander Neonxp Kiryukhin
Date: Sat, 14 Mar 2026 00:44:19 +0300
Subject: =?UTF-8?q?=D0=BD=D0=B0=D1=87=D0=B0=D0=BB=D1=8C=D0=BD=D1=8B=D0=B9?=
=?UTF-8?q?=20=D0=BA=D0=BE=D0=BC=D0=BC=D0=B8=D1=82?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
internal/application/application.go | 148 ++++++++++++++++++++++++++++++++++++
internal/database/db.go | 54 +++++++++++++
internal/model/item.go | 24 ++++++
internal/source/feed.go | 95 +++++++++++++++++++++++
internal/target/html.go | 14 ++++
internal/target/telegram.go | 112 +++++++++++++++++++++++++++
6 files changed, 447 insertions(+)
create mode 100644 internal/application/application.go
create mode 100644 internal/database/db.go
create mode 100644 internal/model/item.go
create mode 100644 internal/source/feed.go
create mode 100644 internal/target/html.go
create mode 100644 internal/target/telegram.go
(limited to 'internal')
diff --git a/internal/application/application.go b/internal/application/application.go
new file mode 100644
index 0000000..d61f38c
--- /dev/null
+++ b/internal/application/application.go
@@ -0,0 +1,148 @@
+package application
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "os"
+ "sync"
+
+ cm "go.neonxp.ru/conf/model"
+ "go.neonxp.ru/pose/internal/database"
+ "go.neonxp.ru/pose/internal/model"
+ "go.neonxp.ru/pose/internal/source"
+ "go.neonxp.ru/pose/internal/target"
+)
+
+type Application struct {
+ logger *slog.Logger
+ db *database.DB
+ sources []Source
+ targets []Target
+ wg sync.WaitGroup
+}
+
+func New(cfg cm.Group, logger *slog.Logger) (*Application, error) {
+ dbfile := cfg.Get("db_file").StringExt("", os.LookupEnv)
+ db, err := database.New(dbfile)
+ if err != nil {
+ return nil, fmt.Errorf("failed create db: %w", err)
+ }
+
+ app := &Application{
+ logger: logger,
+ db: db,
+ sources: []Source{},
+ targets: []Target{},
+ wg: sync.WaitGroup{},
+ }
+
+ //Build sources
+ for _, it := range cfg.Get("sources").Group() {
+ name := it.String()
+ switch it.Name {
+ case cm.Ident("feed"):
+ src, err := source.NewFeed(
+ it.Group(),
+ app.logger.With(slog.String("source", name)),
+ )
+ if err != nil {
+ return nil, err
+ }
+ app.sources = append(app.sources, src)
+ default:
+ return nil, fmt.Errorf("unknown source type: %s", it.Name)
+ }
+ }
+
+ // Build targets
+ for _, it := range cfg.Get("targets").Group() {
+ name := it.String()
+ switch it.Name {
+ case cm.Ident("telegram"):
+ tgt, err := target.NewTelegram(
+ it.Group(),
+ app.logger.With(slog.String("target", name)),
+ )
+ if err != nil {
+ return nil, err
+ }
+ app.targets = append(app.targets, tgt)
+ default:
+ return nil, fmt.Errorf("unknown target type: %s", it.Name)
+ }
+ }
+
+ return app, nil
+}
+
+func (a *Application) Run(ctx context.Context) error {
+ defer a.wg.Wait()
+
+ // Сборный канал со всех источников
+ inChan := a.inChan(ctx)
+
+ // Срез каналов всех потребителей
+ outChans := a.outChans(ctx)
+
+ // Перекладывание сообщений из inChan во все outChans
+ for it := range inChan {
+ for _, ch := range outChans {
+ select {
+ case ch <- it:
+ slog.InfoContext(ctx, "trying send feed item to target", slog.String("id", it.ID))
+ default:
+ slog.WarnContext(ctx, "feed item ignored because no free targets")
+ }
+ }
+ }
+
+ return nil
+}
+
+func (a *Application) Close() error {
+ return a.db.Close()
+}
+
+func (a *Application) inChan(ctx context.Context) chan model.Item {
+ inChan := make(chan model.Item, 1)
+ a.wg.Go(func() {
+ <-ctx.Done()
+ close(inChan)
+ })
+
+ for _, s := range a.sources {
+ a.wg.Go(func() {
+ for it := range s.Retrive(ctx) {
+ if a.db.Exists(it.ID) {
+ continue
+ }
+ if err := a.db.Append(it.ID); err != nil {
+ a.logger.ErrorContext(ctx, "failed to add item to deduplication db", slog.Any("err", err))
+ continue
+ }
+
+ inChan <- it
+ }
+ })
+ }
+
+ return inChan
+}
+
+func (a *Application) outChans(ctx context.Context) []chan<- model.Item {
+ outChans := make([]chan<- model.Item, len(a.targets))
+
+ for i, t := range a.targets {
+ outChans[i] = t.Send(ctx)
+ }
+ return outChans
+}
+
+type Source interface {
+ Retrive(ctx context.Context) <-chan model.Item
+}
+
+type Target interface {
+ Send(ctx context.Context) chan<- model.Item
+}
diff --git a/internal/database/db.go b/internal/database/db.go
new file mode 100644
index 0000000..2fbfe74
--- /dev/null
+++ b/internal/database/db.go
@@ -0,0 +1,54 @@
+// Package database простейшая БД которая позволяет только два действия:
+//
+// 1. сохранить некую строку - `Append(string) error`
+// 2. ответить на вопрос сохранялась ли уже когда либо эта строка -
+// `Exists(string) bool`
+//
+// Нужна для дедупликации публикуемых записей
+package database
+
+import (
+ "bytes"
+ "fmt"
+ "os"
+)
+
+type DB struct {
+ fp *os.File
+ cache map[string]struct{}
+}
+
+func New(path string) (*DB, error) {
+ cache, _ := os.ReadFile(path) // Ignore if no DB file
+
+ fp, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
+ if err != nil {
+ return nil, fmt.Errorf("failed open db file for write: %w", err)
+ }
+ db := &DB{
+ fp: fp,
+ cache: map[string]struct{}{},
+ }
+ for part := range bytes.SplitSeq(cache, []byte("\n")) {
+ db.cache[string(part)] = struct{}{}
+ }
+
+ return db, nil
+}
+
+func (d *DB) Close() error {
+ return d.fp.Close()
+}
+
+func (d *DB) Exists(s string) bool {
+ _, ok := d.cache[s]
+ return ok
+}
+
+func (d *DB) Append(s string) error {
+ if d.Exists(s) {
+ return nil
+ }
+ _, err := fmt.Fprintln(d.fp, s)
+ return err
+}
diff --git a/internal/model/item.go b/internal/model/item.go
new file mode 100644
index 0000000..c18d830
--- /dev/null
+++ b/internal/model/item.go
@@ -0,0 +1,24 @@
+package model
+
+import (
+ "fmt"
+ "time"
+)
+
+type Item struct {
+ ID string
+ Date time.Time
+ Title string
+ Summary string
+ Link string
+ Img string
+}
+
+func (it *Item) BuildMessage() string {
+ return fmt.Sprintf(
+ `%s
%s
Читать дальше: %[3]s`,
+ it.Title,
+ it.Summary,
+ it.Link,
+ )
+}
diff --git a/internal/source/feed.go b/internal/source/feed.go
new file mode 100644
index 0000000..31f8535
--- /dev/null
+++ b/internal/source/feed.go
@@ -0,0 +1,95 @@
+package source
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "time"
+
+ "github.com/mmcdole/gofeed"
+ cm "go.neonxp.ru/conf/model"
+ "go.neonxp.ru/pose/internal/model"
+)
+
+var (
+ ErrNoFeedAddress = errors.New("no feed address")
+)
+
+type Feed struct {
+ logger *slog.Logger
+ feed string
+ scrapeInterval time.Duration
+ feedParser *gofeed.Parser
+}
+
+func NewFeed(cfg cm.Group, logger *slog.Logger) (*Feed, error) {
+ feedSource := cfg.Get("url").String()
+ if feedSource == "" {
+ return nil, ErrNoFeedAddress
+ }
+ durStr := cfg.Get("scrape_interval").String()
+ if durStr == "" {
+ durStr = "10m"
+ }
+ dur, err := time.ParseDuration(durStr)
+ if err != nil {
+ return nil, fmt.Errorf("failed parse duration at source: %w", err)
+ }
+ return &Feed{
+ feed: feedSource,
+ scrapeInterval: dur,
+ feedParser: gofeed.NewParser(),
+ }, nil
+}
+
+func (a *Feed) Retrive(ctx context.Context) <-chan model.Item {
+ out := make(chan model.Item)
+ ticker := time.NewTicker(a.scrapeInterval)
+ go func() {
+ <-ctx.Done()
+ ticker.Stop()
+ }()
+ go func() {
+ defer close(out)
+ for _, item := range a.scrapeFeed(ctx) {
+ out <- item
+ }
+ for range ticker.C {
+ for _, item := range a.scrapeFeed(ctx) {
+ out <- item
+ }
+ }
+ }()
+
+ return out
+}
+
+func (a *Feed) scrapeFeed(ctx context.Context) []model.Item {
+ feed, err := a.feedParser.ParseURLWithContext(a.feed, ctx)
+ if err != nil {
+ a.logger.Error("failed parse feed", slog.Any("err", err))
+ return nil
+ }
+ result := make([]model.Item, 0, len(feed.Items))
+ for _, it := range feed.Items {
+ date := time.Now()
+ if it.PublishedParsed != nil {
+ date = *it.PublishedParsed
+ }
+ image := ""
+ if it.Image != nil {
+ image = it.Image.URL
+ }
+ result = append(result, model.Item{
+ ID: it.GUID,
+ Date: date,
+ Title: it.Title,
+ Summary: it.Description,
+ Link: it.Link,
+ Img: image,
+ })
+ }
+
+ return result
+}
diff --git a/internal/target/html.go b/internal/target/html.go
new file mode 100644
index 0000000..84d8662
--- /dev/null
+++ b/internal/target/html.go
@@ -0,0 +1,14 @@
+package target
+
+import "strings"
+
+func processHTML(html string) string {
+ html = strings.ReplaceAll(html, "\n", " ")
+ html = strings.ReplaceAll(html, "
", "\n")
+ html = strings.ReplaceAll(html, "
", "\n")
+ html = strings.ReplaceAll(html, "
", "\n")
+ html = strings.ReplaceAll(html, "
", "") + html = strings.ReplaceAll(html, "
", "\n") + + return html +} diff --git a/internal/target/telegram.go b/internal/target/telegram.go new file mode 100644 index 0000000..f4d4aff --- /dev/null +++ b/internal/target/telegram.go @@ -0,0 +1,112 @@ +package target + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "os" + "time" + + "github.com/microcosm-cc/bluemonday" + cm "go.neonxp.ru/conf/model" + "go.neonxp.ru/pose/internal/model" +) + +var ( + ErrNoToken = errors.New("no api token") + ErrNoGroup = errors.New("no group") +) + +const telegramRequestTimeout = 30 * time.Second +const telegramMaxItemsChan = 32 + +type Telegram struct { + logger *slog.Logger + apiToken string + group string + client *http.Client + policy *bluemonday.Policy +} + +func NewTelegram(cfg cm.Group, logger *slog.Logger) (*Telegram, error) { + token := cfg.Get("token").StringExt("", os.LookupEnv) + if token == "" { + return nil, ErrNoToken + } + group := cfg.Get("group").StringExt("", os.LookupEnv) + if group == "" { + return nil, ErrNoGroup + } + + pol := bluemonday.NewPolicy() + + pol.AllowAttrs("href").OnElements("a") + pol.AllowAttrs("class").OnElements("span") + pol.AllowElements("p", "br", "b", "strong", "i", "em", "u", "ins", "s", "strike", "del", "code", "pre", "blockquote") + + return &Telegram{ + logger: logger, + apiToken: token, + group: group, + client: &http.Client{Timeout: telegramRequestTimeout}, + policy: pol, + }, nil +} + +func (t *Telegram) Send(ctx context.Context) chan<- model.Item { + ch := make(chan model.Item, telegramMaxItemsChan) + go func() { + defer close(ch) + for { + select { + case <-ctx.Done(): + return + case item := <-ch: + if err := t.sendMessage(item); err != nil { + t.logger.ErrorContext(ctx, "failed send feed item to telegram", slog.Any("err", err)) + continue + } + t.logger.InfoContext(ctx, "send item to telegram", slog.String("id", item.ID)) + } + } + }() + + return ch +} + +func (t *Telegram) sendMessage(it model.Item) error { + sendMessageURL := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", t.apiToken) + + message := it.BuildMessage() + message = t.policy.Sanitize(message) + message = processHTML(message) + + params := url.Values{} + params.Set("chat_id", t.group) + params.Set("text", message) + params.Set("parse_mode", "HTML") + + resp, err := t.client.PostForm(sendMessageURL, params) + if err != nil { + return err + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + msg, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed read error body: %w", err) + } + return fmt.Errorf( + "invalid status code %d (%s): %s", + resp.StatusCode, + resp.Status, + string(msg), + ) + } + + return nil +} -- cgit v1.2.3