diff options
author | Alexander Kiryukhin <a.kiryukhin@mail.ru> | 2022-05-01 21:50:12 +0300 |
---|---|---|
committer | Alexander Kiryukhin <a.kiryukhin@mail.ru> | 2022-05-01 21:50:12 +0300 |
commit | 9fcf8e29214210612d545bed50d7f889800ac639 (patch) | |
tree | 3a99d2cd37fb8158d49abc1de6298758d205c9dd /channels |
Initial
Diffstat (limited to 'channels')
-rw-r--r-- | channels/doc.go | 2 | ||||
-rw-r--r-- | channels/fanin.go | 23 | ||||
-rw-r--r-- | channels/fanin_test.go | 57 | ||||
-rw-r--r-- | channels/fanout.go | 23 | ||||
-rw-r--r-- | channels/fanout_test.go | 48 |
5 files changed, 153 insertions, 0 deletions
diff --git a/channels/doc.go b/channels/doc.go new file mode 100644 index 0000000..0ac1c76 --- /dev/null +++ b/channels/doc.go @@ -0,0 +1,2 @@ +// Функции работы над каналами +package channels diff --git a/channels/fanin.go b/channels/fanin.go new file mode 100644 index 0000000..679aeee --- /dev/null +++ b/channels/fanin.go @@ -0,0 +1,23 @@ +package channels + +import "sync/atomic" + +// FanIn сливает несколько каналов в один +func FanIn[T any](chans ...chan T) chan T { + out := make(chan T) + workers := int64(len(chans)) + for _, ch := range chans { + func(ch chan T) { + go func() { + for t := range ch { + out <- t + } + atomic.AddInt64(&workers, -1) + if workers == 0 { + close(out) + } + }() + }(ch) + } + return out +} diff --git a/channels/fanin_test.go b/channels/fanin_test.go new file mode 100644 index 0000000..dd0c889 --- /dev/null +++ b/channels/fanin_test.go @@ -0,0 +1,57 @@ +package channels + +import ( + "reflect" + "sort" + "testing" +) + +func TestFanIn(t *testing.T) { + type args struct { + chans [][]int + } + tests := []struct { + name string + args args + want []int + }{ + { + name: "test fanin", + args: args{ + chans: [][]int{ + {1, 2, 3}, + {4, 5, 6}, + {7, 8, 9}, + }, + }, + want: []int{1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := []int{} + chans := make([]chan int, len(tt.args.chans)) + for i, v := range tt.args.chans { + func(i int, v []int) { + chans[i] = make(chan int, len(v)) + go func(i int, v []int) { + for _, v2 := range v { + chans[i] <- v2 + } + close(chans[i]) + }(i, v) + }(i, v) + } + + ch := FanIn(chans...) + + for o := range ch { + got = append(got, o) + } + sort.Ints(got) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("FanIn() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/channels/fanout.go b/channels/fanout.go new file mode 100644 index 0000000..23a32b4 --- /dev/null +++ b/channels/fanout.go @@ -0,0 +1,23 @@ +package channels + +// FanOut раскидывает очередь канала на несколько каналов равномерно +func FanOut[T any](in chan T, workers int) []chan T { + out := make([]chan T, workers) + for i := 0; i < workers; i++ { + out[i] = make(chan T) + } + go func() { + i := 0 + for t := range in { + out[i] <- t + i++ + if i == workers { + i = 0 + } + } + for i := 0; i < workers; i++ { + close(out[i]) + } + }() + return out +} diff --git a/channels/fanout_test.go b/channels/fanout_test.go new file mode 100644 index 0000000..f8b0380 --- /dev/null +++ b/channels/fanout_test.go @@ -0,0 +1,48 @@ +package channels + +import ( + "reflect" + "sort" + "testing" +) + +func TestFanOut(t *testing.T) { + type args struct { + in []int + workers int + } + tests := []struct { + name string + args args + }{ + { + name: "test fanout - fanin", + args: args{ + in: []int{1, 2, 3, 4, 5, 6, 7, 8, 9}, + workers: 2, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + in := make(chan int) + go func() { + for _, v := range tt.args.in { + in <- v + } + close(in) + }() + + ch := FanOut(in, tt.args.workers) + gotCh := FanIn(ch...) + got := []int{} + for v := range gotCh { + got = append(got, v) + } + sort.Ints(got) + if !reflect.DeepEqual(got, tt.args.in) { + t.Errorf("FanIn() = %v, want %v", got, tt.args.in) + } + }) + } +} |