aboutsummaryrefslogtreecommitdiff
path: root/channels
diff options
context:
space:
mode:
authorAlexander Kiryukhin <a.kiryukhin@mail.ru>2022-05-01 21:50:12 +0300
committerAlexander Kiryukhin <a.kiryukhin@mail.ru>2022-05-01 21:50:12 +0300
commit9fcf8e29214210612d545bed50d7f889800ac639 (patch)
tree3a99d2cd37fb8158d49abc1de6298758d205c9dd /channels
Initial
Diffstat (limited to 'channels')
-rw-r--r--channels/doc.go2
-rw-r--r--channels/fanin.go23
-rw-r--r--channels/fanin_test.go57
-rw-r--r--channels/fanout.go23
-rw-r--r--channels/fanout_test.go48
5 files changed, 153 insertions, 0 deletions
diff --git a/channels/doc.go b/channels/doc.go
new file mode 100644
index 0000000..0ac1c76
--- /dev/null
+++ b/channels/doc.go
@@ -0,0 +1,2 @@
+// Функции работы над каналами
+package channels
diff --git a/channels/fanin.go b/channels/fanin.go
new file mode 100644
index 0000000..679aeee
--- /dev/null
+++ b/channels/fanin.go
@@ -0,0 +1,23 @@
+package channels
+
+import "sync/atomic"
+
+// FanIn сливает несколько каналов в один
+func FanIn[T any](chans ...chan T) chan T {
+ out := make(chan T)
+ workers := int64(len(chans))
+ for _, ch := range chans {
+ func(ch chan T) {
+ go func() {
+ for t := range ch {
+ out <- t
+ }
+ atomic.AddInt64(&workers, -1)
+ if workers == 0 {
+ close(out)
+ }
+ }()
+ }(ch)
+ }
+ return out
+}
diff --git a/channels/fanin_test.go b/channels/fanin_test.go
new file mode 100644
index 0000000..dd0c889
--- /dev/null
+++ b/channels/fanin_test.go
@@ -0,0 +1,57 @@
+package channels
+
+import (
+ "reflect"
+ "sort"
+ "testing"
+)
+
+func TestFanIn(t *testing.T) {
+ type args struct {
+ chans [][]int
+ }
+ tests := []struct {
+ name string
+ args args
+ want []int
+ }{
+ {
+ name: "test fanin",
+ args: args{
+ chans: [][]int{
+ {1, 2, 3},
+ {4, 5, 6},
+ {7, 8, 9},
+ },
+ },
+ want: []int{1, 2, 3, 4, 5, 6, 7, 8, 9},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := []int{}
+ chans := make([]chan int, len(tt.args.chans))
+ for i, v := range tt.args.chans {
+ func(i int, v []int) {
+ chans[i] = make(chan int, len(v))
+ go func(i int, v []int) {
+ for _, v2 := range v {
+ chans[i] <- v2
+ }
+ close(chans[i])
+ }(i, v)
+ }(i, v)
+ }
+
+ ch := FanIn(chans...)
+
+ for o := range ch {
+ got = append(got, o)
+ }
+ sort.Ints(got)
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("FanIn() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/channels/fanout.go b/channels/fanout.go
new file mode 100644
index 0000000..23a32b4
--- /dev/null
+++ b/channels/fanout.go
@@ -0,0 +1,23 @@
+package channels
+
+// FanOut раскидывает очередь канала на несколько каналов равномерно
+func FanOut[T any](in chan T, workers int) []chan T {
+ out := make([]chan T, workers)
+ for i := 0; i < workers; i++ {
+ out[i] = make(chan T)
+ }
+ go func() {
+ i := 0
+ for t := range in {
+ out[i] <- t
+ i++
+ if i == workers {
+ i = 0
+ }
+ }
+ for i := 0; i < workers; i++ {
+ close(out[i])
+ }
+ }()
+ return out
+}
diff --git a/channels/fanout_test.go b/channels/fanout_test.go
new file mode 100644
index 0000000..f8b0380
--- /dev/null
+++ b/channels/fanout_test.go
@@ -0,0 +1,48 @@
+package channels
+
+import (
+ "reflect"
+ "sort"
+ "testing"
+)
+
+func TestFanOut(t *testing.T) {
+ type args struct {
+ in []int
+ workers int
+ }
+ tests := []struct {
+ name string
+ args args
+ }{
+ {
+ name: "test fanout - fanin",
+ args: args{
+ in: []int{1, 2, 3, 4, 5, 6, 7, 8, 9},
+ workers: 2,
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ in := make(chan int)
+ go func() {
+ for _, v := range tt.args.in {
+ in <- v
+ }
+ close(in)
+ }()
+
+ ch := FanOut(in, tt.args.workers)
+ gotCh := FanIn(ch...)
+ got := []int{}
+ for v := range gotCh {
+ got = append(got, v)
+ }
+ sort.Ints(got)
+ if !reflect.DeepEqual(got, tt.args.in) {
+ t.Errorf("FanIn() = %v, want %v", got, tt.args.in)
+ }
+ })
+ }
+}