aboutsummaryrefslogtreecommitdiff
path: root/writer.go
diff options
context:
space:
mode:
authorAlexander NeonXP Kiryukhin <a.kiryukhin@mail.ru>2019-05-30 14:05:08 +0300
committerAlexander NeonXP Kiryukhin <a.kiryukhin@mail.ru>2019-05-30 14:05:08 +0300
commit2f4b6e90597784a8a7c01027e0ff5c6b69634a96 (patch)
tree5d94daf9fc394425b03baf3915ffd03a0011e442 /writer.go
parent97fd40df4d85d91f3414dc88aa96222c1d827b49 (diff)
Parallel write
Speedup
Diffstat (limited to 'writer.go')
-rw-r--r--writer.go101
1 files changed, 101 insertions, 0 deletions
diff --git a/writer.go b/writer.go
new file mode 100644
index 0000000..fe82656
--- /dev/null
+++ b/writer.go
@@ -0,0 +1,101 @@
+package main
+
+import (
+ "context"
+ "log"
+
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/mongo"
+ "go.mongodb.org/mongo-driver/mongo/options"
+)
+
+func write(ctx context.Context, db *mongo.Database, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, initial bool, blockSize int, worker int) error {
+ nodes := db.Collection("nodes")
+ ways := db.Collection("ways")
+ relations := db.Collection("relations")
+ opts := (new(options.BulkWriteOptions)).SetOrdered(false)
+ nodesBuffer := make([]mongo.WriteModel, 0, blockSize)
+ waysBuffer := make([]mongo.WriteModel, 0, blockSize)
+ relationsBuffer := make([]mongo.WriteModel, 0, blockSize)
+ nc := 0
+ wc := 0
+ rc := 0
+ for {
+ select {
+ case w := <-waysCh:
+ if initial {
+ um := mongo.NewInsertOneModel()
+ um.SetDocument(w)
+ waysBuffer = append(waysBuffer, um)
+ } else {
+ um := mongo.NewUpdateOneModel()
+ um.SetUpsert(true)
+ um.SetUpdate(w)
+ um.SetFilter(bson.M{"osm_id": w.OsmID})
+ waysBuffer = append(waysBuffer, um)
+ }
+
+ case n := <-nodesCh:
+ if initial {
+ um := mongo.NewInsertOneModel()
+ um.SetDocument(n)
+ nodesBuffer = append(nodesBuffer, um)
+ } else {
+ um := mongo.NewUpdateOneModel()
+ um.SetUpsert(true)
+ um.SetUpdate(n)
+ um.SetFilter(bson.M{"osm_id": n.OsmID})
+ nodesBuffer = append(nodesBuffer, um)
+ }
+ case r := <-relationsCh:
+ if initial {
+ um := mongo.NewInsertOneModel()
+ um.SetDocument(r)
+ relationsBuffer = append(relationsBuffer, um)
+ } else {
+ um := mongo.NewUpdateOneModel()
+ um.SetUpsert(true)
+ um.SetUpdate(r)
+ um.SetFilter(bson.M{"osm_id": r.OsmID})
+ relationsBuffer = append(relationsBuffer, um)
+ }
+ case <-ctx.Done():
+ log.Printf("[%d] saving last info in buffers...", worker)
+ if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil {
+ return err
+ }
+ if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil {
+ return err
+ }
+ if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil {
+ return err
+ }
+ log.Printf("[%d] Done", worker)
+ return nil
+ }
+ if len(nodesBuffer) == blockSize {
+ nc++
+ log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
+ if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil {
+ return err
+ }
+ nodesBuffer = make([]mongo.WriteModel, 0)
+ }
+ if len(waysBuffer) == blockSize {
+ wc++
+ log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
+ if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil {
+ return err
+ }
+ waysBuffer = make([]mongo.WriteModel, 0)
+ }
+ if len(relationsBuffer) == blockSize {
+ rc++
+ log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
+ if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil {
+ return err
+ }
+ relationsBuffer = make([]mongo.WriteModel, 0)
+ }
+ }
+}