aboutsummaryrefslogtreecommitdiff
path: root/internal/application/application.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/application/application.go')
-rw-r--r--internal/application/application.go148
1 files changed, 148 insertions, 0 deletions
diff --git a/internal/application/application.go b/internal/application/application.go
new file mode 100644
index 0000000..d61f38c
--- /dev/null
+++ b/internal/application/application.go
@@ -0,0 +1,148 @@
+package application
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "os"
+ "sync"
+
+ cm "go.neonxp.ru/conf/model"
+ "go.neonxp.ru/pose/internal/database"
+ "go.neonxp.ru/pose/internal/model"
+ "go.neonxp.ru/pose/internal/source"
+ "go.neonxp.ru/pose/internal/target"
+)
+
+type Application struct {
+ logger *slog.Logger
+ db *database.DB
+ sources []Source
+ targets []Target
+ wg sync.WaitGroup
+}
+
+func New(cfg cm.Group, logger *slog.Logger) (*Application, error) {
+ dbfile := cfg.Get("db_file").StringExt("", os.LookupEnv)
+ db, err := database.New(dbfile)
+ if err != nil {
+ return nil, fmt.Errorf("failed create db: %w", err)
+ }
+
+ app := &Application{
+ logger: logger,
+ db: db,
+ sources: []Source{},
+ targets: []Target{},
+ wg: sync.WaitGroup{},
+ }
+
+ //Build sources
+ for _, it := range cfg.Get("sources").Group() {
+ name := it.String()
+ switch it.Name {
+ case cm.Ident("feed"):
+ src, err := source.NewFeed(
+ it.Group(),
+ app.logger.With(slog.String("source", name)),
+ )
+ if err != nil {
+ return nil, err
+ }
+ app.sources = append(app.sources, src)
+ default:
+ return nil, fmt.Errorf("unknown source type: %s", it.Name)
+ }
+ }
+
+ // Build targets
+ for _, it := range cfg.Get("targets").Group() {
+ name := it.String()
+ switch it.Name {
+ case cm.Ident("telegram"):
+ tgt, err := target.NewTelegram(
+ it.Group(),
+ app.logger.With(slog.String("target", name)),
+ )
+ if err != nil {
+ return nil, err
+ }
+ app.targets = append(app.targets, tgt)
+ default:
+ return nil, fmt.Errorf("unknown target type: %s", it.Name)
+ }
+ }
+
+ return app, nil
+}
+
+func (a *Application) Run(ctx context.Context) error {
+ defer a.wg.Wait()
+
+ // Сборный канал со всех источников
+ inChan := a.inChan(ctx)
+
+ // Срез каналов всех потребителей
+ outChans := a.outChans(ctx)
+
+ // Перекладывание сообщений из inChan во все outChans
+ for it := range inChan {
+ for _, ch := range outChans {
+ select {
+ case ch <- it:
+ slog.InfoContext(ctx, "trying send feed item to target", slog.String("id", it.ID))
+ default:
+ slog.WarnContext(ctx, "feed item ignored because no free targets")
+ }
+ }
+ }
+
+ return nil
+}
+
+func (a *Application) Close() error {
+ return a.db.Close()
+}
+
+func (a *Application) inChan(ctx context.Context) chan model.Item {
+ inChan := make(chan model.Item, 1)
+ a.wg.Go(func() {
+ <-ctx.Done()
+ close(inChan)
+ })
+
+ for _, s := range a.sources {
+ a.wg.Go(func() {
+ for it := range s.Retrive(ctx) {
+ if a.db.Exists(it.ID) {
+ continue
+ }
+ if err := a.db.Append(it.ID); err != nil {
+ a.logger.ErrorContext(ctx, "failed to add item to deduplication db", slog.Any("err", err))
+ continue
+ }
+
+ inChan <- it
+ }
+ })
+ }
+
+ return inChan
+}
+
+func (a *Application) outChans(ctx context.Context) []chan<- model.Item {
+ outChans := make([]chan<- model.Item, len(a.targets))
+
+ for i, t := range a.targets {
+ outChans[i] = t.Send(ctx)
+ }
+ return outChans
+}
+
+type Source interface {
+ Retrive(ctx context.Context) <-chan model.Item
+}
+
+type Target interface {
+ Send(ctx context.Context) chan<- model.Item
+}