aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md5
-rw-r--r--main.go158
2 files changed, 99 insertions, 64 deletions
diff --git a/README.md b/README.md
index c52dd76..df0d88d 100644
--- a/README.md
+++ b/README.md
@@ -8,11 +8,14 @@ Simple loader from osm dump file to mongodb. Based on https://github.com/paulmac
## Usage
-`./osm2go -osmfile PATH_TO_OSM_FILE [-dbconnection mongodb://localhost:27017] [-dbname osm]`
+`./osm2go -osmfile PATH_TO_OSM_FILE [-dbconnection mongodb://localhost:27017] [-dbname osm] [-initial=true] [-concurrency=16] [-block=1000]`
* `osmfile` required, path to *.osm or *.osm.pbf file
* `dbconnection` optional, mongodb connection string (default: `mongodb://localhost:27017`)
* `dbname` optional, mongodb database name (default: `osm`)
+* `initial` optional, use insert instead upsert. Faster, but not check if item exists (default: `false`)
+* `concurrency` optional, parallel read processes (default, `16`)
+* `block` optional, block size to bulk write (default: `1000`)
## Example
diff --git a/main.go b/main.go
index 9ae13e9..9ec7157 100644
--- a/main.go
+++ b/main.go
@@ -3,7 +3,6 @@ package main
import (
"context"
"flag"
- "fmt"
"log"
"os"
"time"
@@ -18,8 +17,11 @@ import (
func main() {
dbconnection := flag.String("dbconnection", "mongodb://localhost:27017", "Mongo database name")
- dbname := flag.String("dbname", "osm", "Mongo database name")
+ dbname := flag.String("dbname", "map", "Mongo database name")
osmfile := flag.String("osmfile", "", "OSM file")
+ initial := flag.Bool("initial", false, "Is initial import")
+ concurrency := flag.Int("concurrency", 16, "Concurrency")
+ blockSize := flag.Int("block", 1000, "Block size to bulk write")
flag.Parse()
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
client, err := mongo.Connect(ctx, options.Client().ApplyURI(*dbconnection))
@@ -28,67 +30,33 @@ func main() {
}
defer client.Disconnect(context.Background())
db := client.Database(*dbname)
- if err := read(db, *osmfile); err != nil {
+ log.Printf("Started import file %s to db %s", *osmfile, *dbname)
+ if *initial {
+ log.Println("Initial import")
+ } else {
+ log.Println("Diff import")
+ }
+ if err := read(db, *osmfile, *initial, *concurrency, *blockSize); err != nil {
log.Fatal(err)
}
}
-func read(db *mongo.Database, file string) error {
+func read(db *mongo.Database, file string, initial bool, concurrency int, blockSize int) error {
nodes := db.Collection("nodes")
- _, _ = nodes.Indexes().CreateOne(
- context.Background(),
- mongo.IndexModel{
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(1)}},
- Options: options.Index().SetUnique(true).SetSparse(true).SetBackground(true),
- },
- )
- _, _ = nodes.Indexes().CreateOne(
- context.Background(),
- mongo.IndexModel{
- Keys: bsonx.Doc{{"coords", bsonx.Int32(1)}},
- Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true),
- },
- )
+ simpleIndex(nodes, []string{"osm_id"}, true)
+ simpleIndex(nodes, []string{"tags"}, false)
+ geoIndex(nodes, "location")
ways := db.Collection("ways")
- _, _ = ways.Indexes().CreateOne(
- context.Background(),
- mongo.IndexModel{
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(1)}},
- Options: options.Index().SetUnique(true).SetSparse(true).SetBackground(true),
- },
- )
- _, _ = ways.Indexes().CreateOne(
- context.Background(),
- mongo.IndexModel{
- Keys: bsonx.Doc{{"nodes", bsonx.Int32(1)}},
- Options: options.Index().SetSparse(true).SetBackground(true),
- },
- )
+ simpleIndex(ways, []string{"osm_id"}, true)
+ simpleIndex(ways, []string{"tags"}, false)
relations := db.Collection("relations")
- _, _ = relations.Indexes().CreateOne(
- context.Background(),
- mongo.IndexModel{
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(1)}},
- Options: options.Index().SetUnique(true).SetSparse(true).SetBackground(true),
- },
- )
- _, _ = relations.Indexes().CreateOne(
- context.Background(),
- mongo.IndexModel{
- Keys: bsonx.Doc{{"members.ref", bsonx.Int32(1)}},
- Options: options.Index().SetUnique(true).SetSparse(true).SetBackground(true),
- },
- )
- _, _ = relations.Indexes().CreateOne(
- context.Background(),
- mongo.IndexModel{
- Keys: bsonx.Doc{{"members.coords", bsonx.Int32(1)}},
- Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true),
- },
- )
+ simpleIndex(relations, []string{"osm_id"}, true)
+ simpleIndex(relations, []string{"tags"}, false)
+ simpleIndex(relations, []string{"members.ref"}, false)
+ geoIndex(relations, "members.coords")
f, err := os.Open(file)
if err != nil {
@@ -96,14 +64,15 @@ func read(db *mongo.Database, file string) error {
}
defer f.Close()
- opts := (new(options.ReplaceOptions)).SetUpsert(true)
+ opts := (new(options.BulkWriteOptions)).SetOrdered(false)
nc := 0
wc := 0
rc := 0
- scanner := osmpbf.New(context.Background(), f, 3)
+ scanner := osmpbf.New(context.Background(), f, concurrency)
defer scanner.Close()
+ buffer := make([]mongo.WriteModel, 0, blockSize)
for scanner.Scan() {
o := scanner.Object()
switch o := o.(type) {
@@ -112,6 +81,7 @@ func read(db *mongo.Database, file string) error {
for _, v := range o.Nodes {
nodes = append(nodes, int64(v.ID))
}
+
w := Way{
OsmID: int64(o.ID),
Tags: convertTags(o.Tags),
@@ -120,8 +90,16 @@ func read(db *mongo.Database, file string) error {
Version: o.Version,
Visible: o.Visible,
}
- if _, err = ways.ReplaceOne(context.Background(), bson.M{"osm_id": int64(o.ID)}, w, opts); err != nil {
- return err
+ if initial {
+ um := mongo.NewInsertOneModel()
+ um.SetDocument(w)
+ buffer = append(buffer, um)
+ } else {
+ um := mongo.NewUpdateOneModel()
+ um.SetUpsert(true)
+ um.SetUpdate(w)
+ um.SetFilter(bson.M{"osm_id": w.OsmID})
+ buffer = append(buffer, um)
}
wc++
case *osm.Node:
@@ -138,8 +116,16 @@ func read(db *mongo.Database, file string) error {
Timestamp: o.Timestamp,
Visible: o.Visible,
}
- if _, err = nodes.ReplaceOne(context.Background(), bson.M{"osm_id": int64(o.ID)}, n, opts); err != nil {
- return err
+ if initial {
+ um := mongo.NewInsertOneModel()
+ um.SetDocument(n)
+ buffer = append(buffer, um)
+ } else {
+ um := mongo.NewUpdateOneModel()
+ um.SetUpsert(true)
+ um.SetUpdate(n)
+ um.SetFilter(bson.M{"osm_id": n.OsmID})
+ buffer = append(buffer, um)
}
nc++
case *osm.Relation:
@@ -167,14 +153,34 @@ func read(db *mongo.Database, file string) error {
Visible: o.Visible,
Members: members,
}
- if _, err = relations.ReplaceOne(context.Background(), bson.M{"osm_id": int64(o.ID)}, r, opts); err != nil {
- return err
+ if initial {
+ um := mongo.NewInsertOneModel()
+ um.SetDocument(r)
+ buffer = append(buffer, um)
+ } else {
+ um := mongo.NewUpdateOneModel()
+ um.SetUpsert(true)
+ um.SetUpdate(r)
+ um.SetFilter(bson.M{"osm_id": r.OsmID})
+ buffer = append(buffer, um)
}
rc++
}
- fmt.Printf("\rNodes: %d Ways: %d Relations: %d", nc, wc, rc)
+ if len(buffer) == blockSize {
+ if _, err := nodes.BulkWrite(context.Background(), buffer, opts); err != nil {
+ return err
+ }
+ buffer = make([]mongo.WriteModel, 0, blockSize)
+ log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
+ }
}
-
+ if len(buffer) != 0 {
+ if _, err := nodes.BulkWrite(context.Background(), buffer, opts); err != nil {
+ return err
+ }
+ log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
+ }
+ log.Println("Import done")
scanErr := scanner.Err()
if scanErr != nil {
return scanErr
@@ -189,3 +195,29 @@ func convertTags(tags osm.Tags) map[string]string {
}
return result
}
+
+func simpleIndex(col *mongo.Collection, keys []string, unique bool) {
+ idxKeys := bsonx.Doc{}
+ for _, e := range keys {
+ idxKeys.Append(e, bsonx.Int32(1))
+ }
+ _, _ = col.Indexes().CreateOne(
+ context.Background(),
+ mongo.IndexModel{
+ Keys: idxKeys,
+ Options: options.Index().SetUnique(unique).SetSparse(true).SetBackground(true),
+ },
+ )
+}
+
+func geoIndex(col *mongo.Collection, key string) {
+ _, _ = col.Indexes().CreateOne(
+ context.Background(),
+ mongo.IndexModel{
+ Keys: bsonx.Doc{{
+ Key: key, Value: bsonx.Int32(1),
+ }},
+ Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true),
+ },
+ )
+}