diff options
Diffstat (limited to 'main.go')
-rw-r--r-- | main.go | 158 |
1 files changed, 95 insertions, 63 deletions
@@ -3,7 +3,6 @@ package main import ( "context" "flag" - "fmt" "log" "os" "time" @@ -18,8 +17,11 @@ import ( func main() { dbconnection := flag.String("dbconnection", "mongodb://localhost:27017", "Mongo database name") - dbname := flag.String("dbname", "osm", "Mongo database name") + dbname := flag.String("dbname", "map", "Mongo database name") osmfile := flag.String("osmfile", "", "OSM file") + initial := flag.Bool("initial", false, "Is initial import") + concurrency := flag.Int("concurrency", 16, "Concurrency") + blockSize := flag.Int("block", 1000, "Block size to bulk write") flag.Parse() ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) client, err := mongo.Connect(ctx, options.Client().ApplyURI(*dbconnection)) @@ -28,67 +30,33 @@ func main() { } defer client.Disconnect(context.Background()) db := client.Database(*dbname) - if err := read(db, *osmfile); err != nil { + log.Printf("Started import file %s to db %s", *osmfile, *dbname) + if *initial { + log.Println("Initial import") + } else { + log.Println("Diff import") + } + if err := read(db, *osmfile, *initial, *concurrency, *blockSize); err != nil { log.Fatal(err) } } -func read(db *mongo.Database, file string) error { +func read(db *mongo.Database, file string, initial bool, concurrency int, blockSize int) error { nodes := db.Collection("nodes") - _, _ = nodes.Indexes().CreateOne( - context.Background(), - mongo.IndexModel{ - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(1)}}, - Options: options.Index().SetUnique(true).SetSparse(true).SetBackground(true), - }, - ) - _, _ = nodes.Indexes().CreateOne( - context.Background(), - mongo.IndexModel{ - Keys: bsonx.Doc{{"coords", bsonx.Int32(1)}}, - Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true), - }, - ) + simpleIndex(nodes, []string{"osm_id"}, true) + simpleIndex(nodes, []string{"tags"}, false) + geoIndex(nodes, "location") ways := db.Collection("ways") - _, _ = ways.Indexes().CreateOne( - context.Background(), - mongo.IndexModel{ - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(1)}}, - Options: options.Index().SetUnique(true).SetSparse(true).SetBackground(true), - }, - ) - _, _ = ways.Indexes().CreateOne( - context.Background(), - mongo.IndexModel{ - Keys: bsonx.Doc{{"nodes", bsonx.Int32(1)}}, - Options: options.Index().SetSparse(true).SetBackground(true), - }, - ) + simpleIndex(ways, []string{"osm_id"}, true) + simpleIndex(ways, []string{"tags"}, false) relations := db.Collection("relations") - _, _ = relations.Indexes().CreateOne( - context.Background(), - mongo.IndexModel{ - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(1)}}, - Options: options.Index().SetUnique(true).SetSparse(true).SetBackground(true), - }, - ) - _, _ = relations.Indexes().CreateOne( - context.Background(), - mongo.IndexModel{ - Keys: bsonx.Doc{{"members.ref", bsonx.Int32(1)}}, - Options: options.Index().SetUnique(true).SetSparse(true).SetBackground(true), - }, - ) - _, _ = relations.Indexes().CreateOne( - context.Background(), - mongo.IndexModel{ - Keys: bsonx.Doc{{"members.coords", bsonx.Int32(1)}}, - Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true), - }, - ) + simpleIndex(relations, []string{"osm_id"}, true) + simpleIndex(relations, []string{"tags"}, false) + simpleIndex(relations, []string{"members.ref"}, false) + geoIndex(relations, "members.coords") f, err := os.Open(file) if err != nil { @@ -96,14 +64,15 @@ func read(db *mongo.Database, file string) error { } defer f.Close() - opts := (new(options.ReplaceOptions)).SetUpsert(true) + opts := (new(options.BulkWriteOptions)).SetOrdered(false) nc := 0 wc := 0 rc := 0 - scanner := osmpbf.New(context.Background(), f, 3) + scanner := osmpbf.New(context.Background(), f, concurrency) defer scanner.Close() + buffer := make([]mongo.WriteModel, 0, blockSize) for scanner.Scan() { o := scanner.Object() switch o := o.(type) { @@ -112,6 +81,7 @@ func read(db *mongo.Database, file string) error { for _, v := range o.Nodes { nodes = append(nodes, int64(v.ID)) } + w := Way{ OsmID: int64(o.ID), Tags: convertTags(o.Tags), @@ -120,8 +90,16 @@ func read(db *mongo.Database, file string) error { Version: o.Version, Visible: o.Visible, } - if _, err = ways.ReplaceOne(context.Background(), bson.M{"osm_id": int64(o.ID)}, w, opts); err != nil { - return err + if initial { + um := mongo.NewInsertOneModel() + um.SetDocument(w) + buffer = append(buffer, um) + } else { + um := mongo.NewUpdateOneModel() + um.SetUpsert(true) + um.SetUpdate(w) + um.SetFilter(bson.M{"osm_id": w.OsmID}) + buffer = append(buffer, um) } wc++ case *osm.Node: @@ -138,8 +116,16 @@ func read(db *mongo.Database, file string) error { Timestamp: o.Timestamp, Visible: o.Visible, } - if _, err = nodes.ReplaceOne(context.Background(), bson.M{"osm_id": int64(o.ID)}, n, opts); err != nil { - return err + if initial { + um := mongo.NewInsertOneModel() + um.SetDocument(n) + buffer = append(buffer, um) + } else { + um := mongo.NewUpdateOneModel() + um.SetUpsert(true) + um.SetUpdate(n) + um.SetFilter(bson.M{"osm_id": n.OsmID}) + buffer = append(buffer, um) } nc++ case *osm.Relation: @@ -167,14 +153,34 @@ func read(db *mongo.Database, file string) error { Visible: o.Visible, Members: members, } - if _, err = relations.ReplaceOne(context.Background(), bson.M{"osm_id": int64(o.ID)}, r, opts); err != nil { - return err + if initial { + um := mongo.NewInsertOneModel() + um.SetDocument(r) + buffer = append(buffer, um) + } else { + um := mongo.NewUpdateOneModel() + um.SetUpsert(true) + um.SetUpdate(r) + um.SetFilter(bson.M{"osm_id": r.OsmID}) + buffer = append(buffer, um) } rc++ } - fmt.Printf("\rNodes: %d Ways: %d Relations: %d", nc, wc, rc) + if len(buffer) == blockSize { + if _, err := nodes.BulkWrite(context.Background(), buffer, opts); err != nil { + return err + } + buffer = make([]mongo.WriteModel, 0, blockSize) + log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) + } } - + if len(buffer) != 0 { + if _, err := nodes.BulkWrite(context.Background(), buffer, opts); err != nil { + return err + } + log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) + } + log.Println("Import done") scanErr := scanner.Err() if scanErr != nil { return scanErr @@ -189,3 +195,29 @@ func convertTags(tags osm.Tags) map[string]string { } return result } + +func simpleIndex(col *mongo.Collection, keys []string, unique bool) { + idxKeys := bsonx.Doc{} + for _, e := range keys { + idxKeys.Append(e, bsonx.Int32(1)) + } + _, _ = col.Indexes().CreateOne( + context.Background(), + mongo.IndexModel{ + Keys: idxKeys, + Options: options.Index().SetUnique(unique).SetSparse(true).SetBackground(true), + }, + ) +} + +func geoIndex(col *mongo.Collection, key string) { + _, _ = col.Indexes().CreateOne( + context.Background(), + mongo.IndexModel{ + Keys: bsonx.Doc{{ + Key: key, Value: bsonx.Int32(1), + }}, + Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true), + }, + ) +} |