diff options
Diffstat (limited to 'writer.go')
-rw-r--r-- | writer.go | 89 |
1 files changed, 19 insertions, 70 deletions
@@ -9,93 +9,42 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) -func write(ctx context.Context, db *mongo.Database, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, initial bool, blockSize int, worker int) error { - nodes := db.Collection("nodes") - ways := db.Collection("ways") - relations := db.Collection("relations") +func write(ctx context.Context, db *mongo.Database, insertCh chan Object, initial bool, blockSize int, worker int) error { + nodes := db.Collection("items") opts := (new(options.BulkWriteOptions)).SetOrdered(false) - nodesBuffer := make([]mongo.WriteModel, 0, blockSize) - waysBuffer := make([]mongo.WriteModel, 0, blockSize) - relationsBuffer := make([]mongo.WriteModel, 0, blockSize) - nc := 0 - wc := 0 - rc := 0 + buf := make([]mongo.WriteModel, 0, blockSize) + ic := 0 for { select { - case w := <-waysCh: + case w := <-insertCh: if initial { um := mongo.NewInsertOneModel() um.SetDocument(w) - waysBuffer = append(waysBuffer, um) + buf = append(buf, um) } else { um := mongo.NewUpdateOneModel() um.SetUpsert(true) um.SetUpdate(w) - um.SetFilter(bson.M{"osm_id": w.OsmID}) - waysBuffer = append(waysBuffer, um) - } - - case n := <-nodesCh: - if initial { - um := mongo.NewInsertOneModel() - um.SetDocument(n) - nodesBuffer = append(nodesBuffer, um) - } else { - um := mongo.NewUpdateOneModel() - um.SetUpsert(true) - um.SetUpdate(n) - um.SetFilter(bson.M{"osm_id": n.OsmID}) - nodesBuffer = append(nodesBuffer, um) - } - case r := <-relationsCh: - if initial { - um := mongo.NewInsertOneModel() - um.SetDocument(r) - relationsBuffer = append(relationsBuffer, um) - } else { - um := mongo.NewUpdateOneModel() - um.SetUpsert(true) - um.SetUpdate(r) - um.SetFilter(bson.M{"osm_id": r.OsmID}) - relationsBuffer = append(relationsBuffer, um) + um.SetFilter(bson.M{"osm_id": w.ID}) + buf = append(buf, um) } case <-ctx.Done(): - log.Printf("[%d] saving last info in buffers...", worker) - if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil { - return err - } - if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil { - return err - } - if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil { - return err + if len(buf) > 0 { + log.Printf("Worker: %d\tSaving last info in buffers...", worker) + if _, err := nodes.BulkWrite(context.Background(), buf, opts); err != nil { + return err + } } - log.Printf("[%d] Done", worker) + log.Printf("Worker: %d\tDone", worker) return nil } - if len(nodesBuffer) == blockSize { - nc++ - log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc) - if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil { - return err - } - nodesBuffer = make([]mongo.WriteModel, 0) - } - if len(waysBuffer) == blockSize { - wc++ - log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc) - if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil { - return err - } - waysBuffer = make([]mongo.WriteModel, 0) - } - if len(relationsBuffer) == blockSize { - rc++ - log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc) - if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil { + if len(buf) == blockSize { + ic++ + log.Printf("Worker: %d\tWriting block %d (%d objects)", worker, ic, ic*blockSize) + if _, err := nodes.BulkWrite(context.Background(), buf, opts); err != nil { return err } - relationsBuffer = make([]mongo.WriteModel, 0) + buf = make([]mongo.WriteModel, 0) } } } |