aboutsummaryrefslogtreecommitdiff
path: root/writer.go
diff options
context:
space:
mode:
authorAlexander NeonXP Kiryukhin <a.kiryukhin@mail.ru>2019-06-03 13:59:49 +0300
committerAlexander NeonXP Kiryukhin <a.kiryukhin@mail.ru>2019-06-03 13:59:49 +0300
commitc36227104708bf48c95d2b61ba9176370eb0baf5 (patch)
tree47e0a0373956556955f1dcfb8f00421a6c714025 /writer.go
parent2f4b6e90597784a8a7c01027e0ff5c6b69634a96 (diff)
Store objects on single collections (overpass like json)HEADmaster
Filter deleted objects
Diffstat (limited to 'writer.go')
-rw-r--r--writer.go89
1 files changed, 19 insertions, 70 deletions
diff --git a/writer.go b/writer.go
index fe82656..8322929 100644
--- a/writer.go
+++ b/writer.go
@@ -9,93 +9,42 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)
-func write(ctx context.Context, db *mongo.Database, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, initial bool, blockSize int, worker int) error {
- nodes := db.Collection("nodes")
- ways := db.Collection("ways")
- relations := db.Collection("relations")
+func write(ctx context.Context, db *mongo.Database, insertCh chan Object, initial bool, blockSize int, worker int) error {
+ nodes := db.Collection("items")
opts := (new(options.BulkWriteOptions)).SetOrdered(false)
- nodesBuffer := make([]mongo.WriteModel, 0, blockSize)
- waysBuffer := make([]mongo.WriteModel, 0, blockSize)
- relationsBuffer := make([]mongo.WriteModel, 0, blockSize)
- nc := 0
- wc := 0
- rc := 0
+ buf := make([]mongo.WriteModel, 0, blockSize)
+ ic := 0
for {
select {
- case w := <-waysCh:
+ case w := <-insertCh:
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(w)
- waysBuffer = append(waysBuffer, um)
+ buf = append(buf, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(w)
- um.SetFilter(bson.M{"osm_id": w.OsmID})
- waysBuffer = append(waysBuffer, um)
- }
-
- case n := <-nodesCh:
- if initial {
- um := mongo.NewInsertOneModel()
- um.SetDocument(n)
- nodesBuffer = append(nodesBuffer, um)
- } else {
- um := mongo.NewUpdateOneModel()
- um.SetUpsert(true)
- um.SetUpdate(n)
- um.SetFilter(bson.M{"osm_id": n.OsmID})
- nodesBuffer = append(nodesBuffer, um)
- }
- case r := <-relationsCh:
- if initial {
- um := mongo.NewInsertOneModel()
- um.SetDocument(r)
- relationsBuffer = append(relationsBuffer, um)
- } else {
- um := mongo.NewUpdateOneModel()
- um.SetUpsert(true)
- um.SetUpdate(r)
- um.SetFilter(bson.M{"osm_id": r.OsmID})
- relationsBuffer = append(relationsBuffer, um)
+ um.SetFilter(bson.M{"osm_id": w.ID})
+ buf = append(buf, um)
}
case <-ctx.Done():
- log.Printf("[%d] saving last info in buffers...", worker)
- if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil {
- return err
- }
- if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil {
- return err
- }
- if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil {
- return err
+ if len(buf) > 0 {
+ log.Printf("Worker: %d\tSaving last info in buffers...", worker)
+ if _, err := nodes.BulkWrite(context.Background(), buf, opts); err != nil {
+ return err
+ }
}
- log.Printf("[%d] Done", worker)
+ log.Printf("Worker: %d\tDone", worker)
return nil
}
- if len(nodesBuffer) == blockSize {
- nc++
- log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
- if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil {
- return err
- }
- nodesBuffer = make([]mongo.WriteModel, 0)
- }
- if len(waysBuffer) == blockSize {
- wc++
- log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
- if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil {
- return err
- }
- waysBuffer = make([]mongo.WriteModel, 0)
- }
- if len(relationsBuffer) == blockSize {
- rc++
- log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
- if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil {
+ if len(buf) == blockSize {
+ ic++
+ log.Printf("Worker: %d\tWriting block %d (%d objects)", worker, ic, ic*blockSize)
+ if _, err := nodes.BulkWrite(context.Background(), buf, opts); err != nil {
return err
}
- relationsBuffer = make([]mongo.WriteModel, 0)
+ buf = make([]mongo.WriteModel, 0)
}
}
}