aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander NeonXP Kiryukhin <a.kiryukhin@mail.ru>2019-05-28 01:43:42 +0300
committerAlexander NeonXP Kiryukhin <a.kiryukhin@mail.ru>2019-05-28 01:43:42 +0300
commit7a2c6b69dc70e849de4ca54b2767c91ad0accf97 (patch)
tree2860e7a5a87ad9fa2ca8bbef63e8b8b39ea7b90a
parent2afa4e76fb01235fcaa2162cb3b29240df091369 (diff)
Fix bulk write
-rw-r--r--main.go82
1 files changed, 59 insertions, 23 deletions
diff --git a/main.go b/main.go
index 9ec7157..f1af959 100644
--- a/main.go
+++ b/main.go
@@ -33,6 +33,8 @@ func main() {
log.Printf("Started import file %s to db %s", *osmfile, *dbname)
if *initial {
log.Println("Initial import")
+ createIndexes(db)
+ log.Println("Indexes created")
} else {
log.Println("Diff import")
}
@@ -44,19 +46,8 @@ func main() {
func read(db *mongo.Database, file string, initial bool, concurrency int, blockSize int) error {
nodes := db.Collection("nodes")
- simpleIndex(nodes, []string{"osm_id"}, true)
- simpleIndex(nodes, []string{"tags"}, false)
- geoIndex(nodes, "location")
-
ways := db.Collection("ways")
- simpleIndex(ways, []string{"osm_id"}, true)
- simpleIndex(ways, []string{"tags"}, false)
-
relations := db.Collection("relations")
- simpleIndex(relations, []string{"osm_id"}, true)
- simpleIndex(relations, []string{"tags"}, false)
- simpleIndex(relations, []string{"members.ref"}, false)
- geoIndex(relations, "members.coords")
f, err := os.Open(file)
if err != nil {
@@ -72,7 +63,9 @@ func read(db *mongo.Database, file string, initial bool, concurrency int, blockS
scanner := osmpbf.New(context.Background(), f, concurrency)
defer scanner.Close()
- buffer := make([]mongo.WriteModel, 0, blockSize)
+ bufferNodes := make([]mongo.WriteModel, 0, blockSize)
+ bufferWays := make([]mongo.WriteModel, 0, blockSize)
+ bufferRelations := make([]mongo.WriteModel, 0, blockSize)
for scanner.Scan() {
o := scanner.Object()
switch o := o.(type) {
@@ -93,13 +86,13 @@ func read(db *mongo.Database, file string, initial bool, concurrency int, blockS
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(w)
- buffer = append(buffer, um)
+ bufferWays = append(bufferWays, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(w)
um.SetFilter(bson.M{"osm_id": w.OsmID})
- buffer = append(buffer, um)
+ bufferWays = append(bufferWays, um)
}
wc++
case *osm.Node:
@@ -119,13 +112,13 @@ func read(db *mongo.Database, file string, initial bool, concurrency int, blockS
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(n)
- buffer = append(buffer, um)
+ bufferNodes = append(bufferNodes, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(n)
um.SetFilter(bson.M{"osm_id": n.OsmID})
- buffer = append(buffer, um)
+ bufferNodes = append(bufferNodes, um)
}
nc++
case *osm.Relation:
@@ -156,26 +149,52 @@ func read(db *mongo.Database, file string, initial bool, concurrency int, blockS
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(r)
- buffer = append(buffer, um)
+ bufferRelations = append(bufferRelations, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(r)
um.SetFilter(bson.M{"osm_id": r.OsmID})
- buffer = append(buffer, um)
+ bufferRelations = append(bufferRelations, um)
}
rc++
}
- if len(buffer) == blockSize {
- if _, err := nodes.BulkWrite(context.Background(), buffer, opts); err != nil {
+ if len(bufferNodes) == blockSize {
+ if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil {
return err
}
- buffer = make([]mongo.WriteModel, 0, blockSize)
+ bufferNodes = make([]mongo.WriteModel, 0, blockSize)
+ log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
+ }
+ if len(bufferWays) == blockSize {
+ if _, err := nodes.BulkWrite(context.Background(), bufferWays, opts); err != nil {
+ return err
+ }
+ bufferWays = make([]mongo.WriteModel, 0, blockSize)
+ log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
+ }
+ if len(bufferRelations) == blockSize {
+ if _, err := nodes.BulkWrite(context.Background(), bufferRelations, opts); err != nil {
+ return err
+ }
+ bufferRelations = make([]mongo.WriteModel, 0, blockSize)
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
}
- if len(buffer) != 0 {
- if _, err := nodes.BulkWrite(context.Background(), buffer, opts); err != nil {
+ if len(bufferNodes) != 0 {
+ if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil {
+ return err
+ }
+ log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
+ }
+ if len(bufferWays) != 0 {
+ if _, err := ways.BulkWrite(context.Background(), bufferWays, opts); err != nil {
+ return err
+ }
+ log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
+ }
+ if len(bufferRelations) != 0 {
+ if _, err := relations.BulkWrite(context.Background(), bufferRelations, opts); err != nil {
return err
}
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
@@ -188,6 +207,23 @@ func read(db *mongo.Database, file string, initial bool, concurrency int, blockS
return nil
}
+func createIndexes(db *mongo.Database) {
+ nodes := db.Collection("nodes")
+ simpleIndex(nodes, []string{"osm_id"}, true)
+ simpleIndex(nodes, []string{"tags"}, false)
+ geoIndex(nodes, "location")
+
+ ways := db.Collection("ways")
+ simpleIndex(ways, []string{"osm_id"}, true)
+ simpleIndex(ways, []string{"tags"}, false)
+
+ relations := db.Collection("relations")
+ simpleIndex(relations, []string{"osm_id"}, true)
+ simpleIndex(relations, []string{"tags"}, false)
+ simpleIndex(relations, []string{"members.ref"}, false)
+ geoIndex(relations, "members.coords")
+}
+
func convertTags(tags osm.Tags) map[string]string {
result := make(map[string]string, len(tags))
for _, t := range tags {