aboutsummaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
author2026-03-14 00:44:19 +0300
committer2026-03-14 00:44:19 +0300
commite5d6f4c02b757c83244ba5e04fead08623a27299 (patch)
tree5b5babb9887cafa3dbc165928dc2b0fd65265bda /internal
downloadpose-master.tar.gz
pose-master.tar.bz2
pose-master.tar.xz
pose-master.zip
начальный коммитHEADmaster
Diffstat (limited to 'internal')
-rw-r--r--internal/application/application.go148
-rw-r--r--internal/database/db.go54
-rw-r--r--internal/model/item.go24
-rw-r--r--internal/source/feed.go95
-rw-r--r--internal/target/html.go14
-rw-r--r--internal/target/telegram.go112
6 files changed, 447 insertions, 0 deletions
diff --git a/internal/application/application.go b/internal/application/application.go
new file mode 100644
index 0000000..d61f38c
--- /dev/null
+++ b/internal/application/application.go
@@ -0,0 +1,148 @@
+package application
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "os"
+ "sync"
+
+ cm "go.neonxp.ru/conf/model"
+ "go.neonxp.ru/pose/internal/database"
+ "go.neonxp.ru/pose/internal/model"
+ "go.neonxp.ru/pose/internal/source"
+ "go.neonxp.ru/pose/internal/target"
+)
+
+type Application struct {
+ logger *slog.Logger
+ db *database.DB
+ sources []Source
+ targets []Target
+ wg sync.WaitGroup
+}
+
+func New(cfg cm.Group, logger *slog.Logger) (*Application, error) {
+ dbfile := cfg.Get("db_file").StringExt("", os.LookupEnv)
+ db, err := database.New(dbfile)
+ if err != nil {
+ return nil, fmt.Errorf("failed create db: %w", err)
+ }
+
+ app := &Application{
+ logger: logger,
+ db: db,
+ sources: []Source{},
+ targets: []Target{},
+ wg: sync.WaitGroup{},
+ }
+
+ //Build sources
+ for _, it := range cfg.Get("sources").Group() {
+ name := it.String()
+ switch it.Name {
+ case cm.Ident("feed"):
+ src, err := source.NewFeed(
+ it.Group(),
+ app.logger.With(slog.String("source", name)),
+ )
+ if err != nil {
+ return nil, err
+ }
+ app.sources = append(app.sources, src)
+ default:
+ return nil, fmt.Errorf("unknown source type: %s", it.Name)
+ }
+ }
+
+ // Build targets
+ for _, it := range cfg.Get("targets").Group() {
+ name := it.String()
+ switch it.Name {
+ case cm.Ident("telegram"):
+ tgt, err := target.NewTelegram(
+ it.Group(),
+ app.logger.With(slog.String("target", name)),
+ )
+ if err != nil {
+ return nil, err
+ }
+ app.targets = append(app.targets, tgt)
+ default:
+ return nil, fmt.Errorf("unknown target type: %s", it.Name)
+ }
+ }
+
+ return app, nil
+}
+
+func (a *Application) Run(ctx context.Context) error {
+ defer a.wg.Wait()
+
+ // Сборный канал со всех источников
+ inChan := a.inChan(ctx)
+
+ // Срез каналов всех потребителей
+ outChans := a.outChans(ctx)
+
+ // Перекладывание сообщений из inChan во все outChans
+ for it := range inChan {
+ for _, ch := range outChans {
+ select {
+ case ch <- it:
+ slog.InfoContext(ctx, "trying send feed item to target", slog.String("id", it.ID))
+ default:
+ slog.WarnContext(ctx, "feed item ignored because no free targets")
+ }
+ }
+ }
+
+ return nil
+}
+
+func (a *Application) Close() error {
+ return a.db.Close()
+}
+
+func (a *Application) inChan(ctx context.Context) chan model.Item {
+ inChan := make(chan model.Item, 1)
+ a.wg.Go(func() {
+ <-ctx.Done()
+ close(inChan)
+ })
+
+ for _, s := range a.sources {
+ a.wg.Go(func() {
+ for it := range s.Retrive(ctx) {
+ if a.db.Exists(it.ID) {
+ continue
+ }
+ if err := a.db.Append(it.ID); err != nil {
+ a.logger.ErrorContext(ctx, "failed to add item to deduplication db", slog.Any("err", err))
+ continue
+ }
+
+ inChan <- it
+ }
+ })
+ }
+
+ return inChan
+}
+
+func (a *Application) outChans(ctx context.Context) []chan<- model.Item {
+ outChans := make([]chan<- model.Item, len(a.targets))
+
+ for i, t := range a.targets {
+ outChans[i] = t.Send(ctx)
+ }
+ return outChans
+}
+
+type Source interface {
+ Retrive(ctx context.Context) <-chan model.Item
+}
+
+type Target interface {
+ Send(ctx context.Context) chan<- model.Item
+}
diff --git a/internal/database/db.go b/internal/database/db.go
new file mode 100644
index 0000000..2fbfe74
--- /dev/null
+++ b/internal/database/db.go
@@ -0,0 +1,54 @@
+// Package database простейшая БД которая позволяет только два действия:
+//
+// 1. сохранить некую строку - `Append(string) error`
+// 2. ответить на вопрос сохранялась ли уже когда либо эта строка -
+// `Exists(string) bool`
+//
+// Нужна для дедупликации публикуемых записей
+package database
+
+import (
+ "bytes"
+ "fmt"
+ "os"
+)
+
+type DB struct {
+ fp *os.File
+ cache map[string]struct{}
+}
+
+func New(path string) (*DB, error) {
+ cache, _ := os.ReadFile(path) // Ignore if no DB file
+
+ fp, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
+ if err != nil {
+ return nil, fmt.Errorf("failed open db file for write: %w", err)
+ }
+ db := &DB{
+ fp: fp,
+ cache: map[string]struct{}{},
+ }
+ for part := range bytes.SplitSeq(cache, []byte("\n")) {
+ db.cache[string(part)] = struct{}{}
+ }
+
+ return db, nil
+}
+
+func (d *DB) Close() error {
+ return d.fp.Close()
+}
+
+func (d *DB) Exists(s string) bool {
+ _, ok := d.cache[s]
+ return ok
+}
+
+func (d *DB) Append(s string) error {
+ if d.Exists(s) {
+ return nil
+ }
+ _, err := fmt.Fprintln(d.fp, s)
+ return err
+}
diff --git a/internal/model/item.go b/internal/model/item.go
new file mode 100644
index 0000000..c18d830
--- /dev/null
+++ b/internal/model/item.go
@@ -0,0 +1,24 @@
+package model
+
+import (
+ "fmt"
+ "time"
+)
+
+type Item struct {
+ ID string
+ Date time.Time
+ Title string
+ Summary string
+ Link string
+ Img string
+}
+
+func (it *Item) BuildMessage() string {
+ return fmt.Sprintf(
+ `<b>%s</b><br /><br />%s<br />Читать дальше: <a href="%[3]s">%[3]s</a>`,
+ it.Title,
+ it.Summary,
+ it.Link,
+ )
+}
diff --git a/internal/source/feed.go b/internal/source/feed.go
new file mode 100644
index 0000000..31f8535
--- /dev/null
+++ b/internal/source/feed.go
@@ -0,0 +1,95 @@
+package source
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "time"
+
+ "github.com/mmcdole/gofeed"
+ cm "go.neonxp.ru/conf/model"
+ "go.neonxp.ru/pose/internal/model"
+)
+
+var (
+ ErrNoFeedAddress = errors.New("no feed address")
+)
+
+type Feed struct {
+ logger *slog.Logger
+ feed string
+ scrapeInterval time.Duration
+ feedParser *gofeed.Parser
+}
+
+func NewFeed(cfg cm.Group, logger *slog.Logger) (*Feed, error) {
+ feedSource := cfg.Get("url").String()
+ if feedSource == "" {
+ return nil, ErrNoFeedAddress
+ }
+ durStr := cfg.Get("scrape_interval").String()
+ if durStr == "" {
+ durStr = "10m"
+ }
+ dur, err := time.ParseDuration(durStr)
+ if err != nil {
+ return nil, fmt.Errorf("failed parse duration at source: %w", err)
+ }
+ return &Feed{
+ feed: feedSource,
+ scrapeInterval: dur,
+ feedParser: gofeed.NewParser(),
+ }, nil
+}
+
+func (a *Feed) Retrive(ctx context.Context) <-chan model.Item {
+ out := make(chan model.Item)
+ ticker := time.NewTicker(a.scrapeInterval)
+ go func() {
+ <-ctx.Done()
+ ticker.Stop()
+ }()
+ go func() {
+ defer close(out)
+ for _, item := range a.scrapeFeed(ctx) {
+ out <- item
+ }
+ for range ticker.C {
+ for _, item := range a.scrapeFeed(ctx) {
+ out <- item
+ }
+ }
+ }()
+
+ return out
+}
+
+func (a *Feed) scrapeFeed(ctx context.Context) []model.Item {
+ feed, err := a.feedParser.ParseURLWithContext(a.feed, ctx)
+ if err != nil {
+ a.logger.Error("failed parse feed", slog.Any("err", err))
+ return nil
+ }
+ result := make([]model.Item, 0, len(feed.Items))
+ for _, it := range feed.Items {
+ date := time.Now()
+ if it.PublishedParsed != nil {
+ date = *it.PublishedParsed
+ }
+ image := ""
+ if it.Image != nil {
+ image = it.Image.URL
+ }
+ result = append(result, model.Item{
+ ID: it.GUID,
+ Date: date,
+ Title: it.Title,
+ Summary: it.Description,
+ Link: it.Link,
+ Img: image,
+ })
+ }
+
+ return result
+}
diff --git a/internal/target/html.go b/internal/target/html.go
new file mode 100644
index 0000000..84d8662
--- /dev/null
+++ b/internal/target/html.go
@@ -0,0 +1,14 @@
+package target
+
+import "strings"
+
+func processHTML(html string) string {
+ html = strings.ReplaceAll(html, "\n", " ")
+ html = strings.ReplaceAll(html, "<br>", "\n")
+ html = strings.ReplaceAll(html, "<br />", "\n")
+ html = strings.ReplaceAll(html, "<br/>", "\n")
+ html = strings.ReplaceAll(html, "<p>", "")
+ html = strings.ReplaceAll(html, "</p>", "\n")
+
+ return html
+}
diff --git a/internal/target/telegram.go b/internal/target/telegram.go
new file mode 100644
index 0000000..f4d4aff
--- /dev/null
+++ b/internal/target/telegram.go
@@ -0,0 +1,112 @@
+package target
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "log/slog"
+ "net/http"
+ "net/url"
+ "os"
+ "time"
+
+ "github.com/microcosm-cc/bluemonday"
+ cm "go.neonxp.ru/conf/model"
+ "go.neonxp.ru/pose/internal/model"
+)
+
+var (
+ ErrNoToken = errors.New("no api token")
+ ErrNoGroup = errors.New("no group")
+)
+
+const telegramRequestTimeout = 30 * time.Second
+const telegramMaxItemsChan = 32
+
+type Telegram struct {
+ logger *slog.Logger
+ apiToken string
+ group string
+ client *http.Client
+ policy *bluemonday.Policy
+}
+
+func NewTelegram(cfg cm.Group, logger *slog.Logger) (*Telegram, error) {
+ token := cfg.Get("token").StringExt("", os.LookupEnv)
+ if token == "" {
+ return nil, ErrNoToken
+ }
+ group := cfg.Get("group").StringExt("", os.LookupEnv)
+ if group == "" {
+ return nil, ErrNoGroup
+ }
+
+ pol := bluemonday.NewPolicy()
+
+ pol.AllowAttrs("href").OnElements("a")
+ pol.AllowAttrs("class").OnElements("span")
+ pol.AllowElements("p", "br", "b", "strong", "i", "em", "u", "ins", "s", "strike", "del", "code", "pre", "blockquote")
+
+ return &Telegram{
+ logger: logger,
+ apiToken: token,
+ group: group,
+ client: &http.Client{Timeout: telegramRequestTimeout},
+ policy: pol,
+ }, nil
+}
+
+func (t *Telegram) Send(ctx context.Context) chan<- model.Item {
+ ch := make(chan model.Item, telegramMaxItemsChan)
+ go func() {
+ defer close(ch)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case item := <-ch:
+ if err := t.sendMessage(item); err != nil {
+ t.logger.ErrorContext(ctx, "failed send feed item to telegram", slog.Any("err", err))
+ continue
+ }
+ t.logger.InfoContext(ctx, "send item to telegram", slog.String("id", item.ID))
+ }
+ }
+ }()
+
+ return ch
+}
+
+func (t *Telegram) sendMessage(it model.Item) error {
+ sendMessageURL := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", t.apiToken)
+
+ message := it.BuildMessage()
+ message = t.policy.Sanitize(message)
+ message = processHTML(message)
+
+ params := url.Values{}
+ params.Set("chat_id", t.group)
+ params.Set("text", message)
+ params.Set("parse_mode", "HTML")
+
+ resp, err := t.client.PostForm(sendMessageURL, params)
+ if err != nil {
+ return err
+ }
+
+ if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+ msg, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return fmt.Errorf("failed read error body: %w", err)
+ }
+ return fmt.Errorf(
+ "invalid status code %d (%s): %s",
+ resp.StatusCode,
+ resp.Status,
+ string(msg),
+ )
+ }
+
+ return nil
+}