diff options
Diffstat (limited to 'internal/application/application.go')
| -rw-r--r-- | internal/application/application.go | 148 |
1 files changed, 148 insertions, 0 deletions
diff --git a/internal/application/application.go b/internal/application/application.go new file mode 100644 index 0000000..d61f38c --- /dev/null +++ b/internal/application/application.go @@ -0,0 +1,148 @@ +package application + +import ( + "context" + "fmt" + "log/slog" + "os" + "sync" + + cm "go.neonxp.ru/conf/model" + "go.neonxp.ru/pose/internal/database" + "go.neonxp.ru/pose/internal/model" + "go.neonxp.ru/pose/internal/source" + "go.neonxp.ru/pose/internal/target" +) + +type Application struct { + logger *slog.Logger + db *database.DB + sources []Source + targets []Target + wg sync.WaitGroup +} + +func New(cfg cm.Group, logger *slog.Logger) (*Application, error) { + dbfile := cfg.Get("db_file").StringExt("", os.LookupEnv) + db, err := database.New(dbfile) + if err != nil { + return nil, fmt.Errorf("failed create db: %w", err) + } + + app := &Application{ + logger: logger, + db: db, + sources: []Source{}, + targets: []Target{}, + wg: sync.WaitGroup{}, + } + + //Build sources + for _, it := range cfg.Get("sources").Group() { + name := it.String() + switch it.Name { + case cm.Ident("feed"): + src, err := source.NewFeed( + it.Group(), + app.logger.With(slog.String("source", name)), + ) + if err != nil { + return nil, err + } + app.sources = append(app.sources, src) + default: + return nil, fmt.Errorf("unknown source type: %s", it.Name) + } + } + + // Build targets + for _, it := range cfg.Get("targets").Group() { + name := it.String() + switch it.Name { + case cm.Ident("telegram"): + tgt, err := target.NewTelegram( + it.Group(), + app.logger.With(slog.String("target", name)), + ) + if err != nil { + return nil, err + } + app.targets = append(app.targets, tgt) + default: + return nil, fmt.Errorf("unknown target type: %s", it.Name) + } + } + + return app, nil +} + +func (a *Application) Run(ctx context.Context) error { + defer a.wg.Wait() + + // Сборный канал со всех источников + inChan := a.inChan(ctx) + + // Срез каналов всех потребителей + outChans := a.outChans(ctx) + + // Перекладывание сообщений из inChan во все outChans + for it := range inChan { + for _, ch := range outChans { + select { + case ch <- it: + slog.InfoContext(ctx, "trying send feed item to target", slog.String("id", it.ID)) + default: + slog.WarnContext(ctx, "feed item ignored because no free targets") + } + } + } + + return nil +} + +func (a *Application) Close() error { + return a.db.Close() +} + +func (a *Application) inChan(ctx context.Context) chan model.Item { + inChan := make(chan model.Item, 1) + a.wg.Go(func() { + <-ctx.Done() + close(inChan) + }) + + for _, s := range a.sources { + a.wg.Go(func() { + for it := range s.Retrive(ctx) { + if a.db.Exists(it.ID) { + continue + } + if err := a.db.Append(it.ID); err != nil { + a.logger.ErrorContext(ctx, "failed to add item to deduplication db", slog.Any("err", err)) + continue + } + + inChan <- it + } + }) + } + + return inChan +} + +func (a *Application) outChans(ctx context.Context) []chan<- model.Item { + outChans := make([]chan<- model.Item, len(a.targets)) + + for i, t := range a.targets { + outChans[i] = t.Send(ctx) + } + return outChans +} + +type Source interface { + Retrive(ctx context.Context) <-chan model.Item +} + +type Target interface { + Send(ctx context.Context) chan<- model.Item +} |
