From 7a2c6b69dc70e849de4ca54b2767c91ad0accf97 Mon Sep 17 00:00:00 2001 From: Alexander NeonXP Kiryukhin Date: Tue, 28 May 2019 01:43:42 +0300 Subject: Fix bulk write --- main.go | 82 +++++++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 59 insertions(+), 23 deletions(-) diff --git a/main.go b/main.go index 9ec7157..f1af959 100644 --- a/main.go +++ b/main.go @@ -33,6 +33,8 @@ func main() { log.Printf("Started import file %s to db %s", *osmfile, *dbname) if *initial { log.Println("Initial import") + createIndexes(db) + log.Println("Indexes created") } else { log.Println("Diff import") } @@ -44,19 +46,8 @@ func main() { func read(db *mongo.Database, file string, initial bool, concurrency int, blockSize int) error { nodes := db.Collection("nodes") - simpleIndex(nodes, []string{"osm_id"}, true) - simpleIndex(nodes, []string{"tags"}, false) - geoIndex(nodes, "location") - ways := db.Collection("ways") - simpleIndex(ways, []string{"osm_id"}, true) - simpleIndex(ways, []string{"tags"}, false) - relations := db.Collection("relations") - simpleIndex(relations, []string{"osm_id"}, true) - simpleIndex(relations, []string{"tags"}, false) - simpleIndex(relations, []string{"members.ref"}, false) - geoIndex(relations, "members.coords") f, err := os.Open(file) if err != nil { @@ -72,7 +63,9 @@ func read(db *mongo.Database, file string, initial bool, concurrency int, blockS scanner := osmpbf.New(context.Background(), f, concurrency) defer scanner.Close() - buffer := make([]mongo.WriteModel, 0, blockSize) + bufferNodes := make([]mongo.WriteModel, 0, blockSize) + bufferWays := make([]mongo.WriteModel, 0, blockSize) + bufferRelations := make([]mongo.WriteModel, 0, blockSize) for scanner.Scan() { o := scanner.Object() switch o := o.(type) { @@ -93,13 +86,13 @@ func read(db *mongo.Database, file string, initial bool, concurrency int, blockS if initial { um := mongo.NewInsertOneModel() um.SetDocument(w) - buffer = append(buffer, um) + bufferWays = append(bufferWays, um) } else { um := mongo.NewUpdateOneModel() um.SetUpsert(true) um.SetUpdate(w) um.SetFilter(bson.M{"osm_id": w.OsmID}) - buffer = append(buffer, um) + bufferWays = append(bufferWays, um) } wc++ case *osm.Node: @@ -119,13 +112,13 @@ func read(db *mongo.Database, file string, initial bool, concurrency int, blockS if initial { um := mongo.NewInsertOneModel() um.SetDocument(n) - buffer = append(buffer, um) + bufferNodes = append(bufferNodes, um) } else { um := mongo.NewUpdateOneModel() um.SetUpsert(true) um.SetUpdate(n) um.SetFilter(bson.M{"osm_id": n.OsmID}) - buffer = append(buffer, um) + bufferNodes = append(bufferNodes, um) } nc++ case *osm.Relation: @@ -156,26 +149,52 @@ func read(db *mongo.Database, file string, initial bool, concurrency int, blockS if initial { um := mongo.NewInsertOneModel() um.SetDocument(r) - buffer = append(buffer, um) + bufferRelations = append(bufferRelations, um) } else { um := mongo.NewUpdateOneModel() um.SetUpsert(true) um.SetUpdate(r) um.SetFilter(bson.M{"osm_id": r.OsmID}) - buffer = append(buffer, um) + bufferRelations = append(bufferRelations, um) } rc++ } - if len(buffer) == blockSize { - if _, err := nodes.BulkWrite(context.Background(), buffer, opts); err != nil { + if len(bufferNodes) == blockSize { + if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil { return err } - buffer = make([]mongo.WriteModel, 0, blockSize) + bufferNodes = make([]mongo.WriteModel, 0, blockSize) + log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) + } + if len(bufferWays) == blockSize { + if _, err := nodes.BulkWrite(context.Background(), bufferWays, opts); err != nil { + return err + } + bufferWays = make([]mongo.WriteModel, 0, blockSize) + log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) + } + if len(bufferRelations) == blockSize { + if _, err := nodes.BulkWrite(context.Background(), bufferRelations, opts); err != nil { + return err + } + bufferRelations = make([]mongo.WriteModel, 0, blockSize) log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) } } - if len(buffer) != 0 { - if _, err := nodes.BulkWrite(context.Background(), buffer, opts); err != nil { + if len(bufferNodes) != 0 { + if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil { + return err + } + log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) + } + if len(bufferWays) != 0 { + if _, err := ways.BulkWrite(context.Background(), bufferWays, opts); err != nil { + return err + } + log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) + } + if len(bufferRelations) != 0 { + if _, err := relations.BulkWrite(context.Background(), bufferRelations, opts); err != nil { return err } log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) @@ -188,6 +207,23 @@ func read(db *mongo.Database, file string, initial bool, concurrency int, blockS return nil } +func createIndexes(db *mongo.Database) { + nodes := db.Collection("nodes") + simpleIndex(nodes, []string{"osm_id"}, true) + simpleIndex(nodes, []string{"tags"}, false) + geoIndex(nodes, "location") + + ways := db.Collection("ways") + simpleIndex(ways, []string{"osm_id"}, true) + simpleIndex(ways, []string{"tags"}, false) + + relations := db.Collection("relations") + simpleIndex(relations, []string{"osm_id"}, true) + simpleIndex(relations, []string{"tags"}, false) + simpleIndex(relations, []string{"members.ref"}, false) + geoIndex(relations, "members.coords") +} + func convertTags(tags osm.Tags) map[string]string { result := make(map[string]string, len(tags)) for _, t := range tags { -- cgit v1.2.3