package application import ( "context" "fmt" "log/slog" "os" "sync" cm "go.neonxp.ru/conf/model" "go.neonxp.ru/pose/internal/database" "go.neonxp.ru/pose/internal/model" "go.neonxp.ru/pose/internal/source" "go.neonxp.ru/pose/internal/target" ) type Application struct { logger *slog.Logger db *database.DB sources []Source targets []Target wg sync.WaitGroup } func New(cfg cm.Group, logger *slog.Logger) (*Application, error) { dbfile := cfg.Get("db_file").StringExt("", os.LookupEnv) db, err := database.New(dbfile) if err != nil { return nil, fmt.Errorf("failed create db: %w", err) } app := &Application{ logger: logger, db: db, sources: []Source{}, targets: []Target{}, wg: sync.WaitGroup{}, } //Build sources for _, it := range cfg.Get("sources").Group() { name := it.String() switch it.Name { case cm.Ident("feed"): src, err := source.NewFeed( it.Group(), app.logger.With(slog.String("source", name)), ) if err != nil { return nil, err } app.sources = append(app.sources, src) default: return nil, fmt.Errorf("unknown source type: %s", it.Name) } } // Build targets for _, it := range cfg.Get("targets").Group() { name := it.String() switch it.Name { case cm.Ident("telegram"): tgt, err := target.NewTelegram( it.Group(), app.logger.With(slog.String("target", name)), ) if err != nil { return nil, err } app.targets = append(app.targets, tgt) default: return nil, fmt.Errorf("unknown target type: %s", it.Name) } } return app, nil } func (a *Application) Run(ctx context.Context) error { defer a.wg.Wait() // Сборный канал со всех источников inChan := a.inChan(ctx) // Срез каналов всех потребителей outChans := a.outChans(ctx) // Перекладывание сообщений из inChan во все outChans for it := range inChan { for _, ch := range outChans { select { case ch <- it: slog.InfoContext(ctx, "trying send feed item to target", slog.String("id", it.ID)) default: slog.WarnContext(ctx, "feed item ignored because no free targets") } } } return nil } func (a *Application) Close() error { return a.db.Close() } func (a *Application) inChan(ctx context.Context) chan model.Item { inChan := make(chan model.Item, 1) a.wg.Go(func() { <-ctx.Done() close(inChan) }) for _, s := range a.sources { a.wg.Go(func() { for it := range s.Retrive(ctx) { if a.db.Exists(it.ID) { continue } if err := a.db.Append(it.ID); err != nil { a.logger.ErrorContext(ctx, "failed to add item to deduplication db", slog.Any("err", err)) continue } inChan <- it } }) } return inChan } func (a *Application) outChans(ctx context.Context) []chan<- model.Item { outChans := make([]chan<- model.Item, len(a.targets)) for i, t := range a.targets { outChans[i] = t.Send(ctx) } return outChans } type Source interface { Retrive(ctx context.Context) <-chan model.Item } type Target interface { Send(ctx context.Context) chan<- model.Item }