aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander NeonXP Kiryukhin <a.kiryukhin@mail.ru>2019-05-30 14:05:08 +0300
committerAlexander NeonXP Kiryukhin <a.kiryukhin@mail.ru>2019-05-30 14:05:08 +0300
commit2f4b6e90597784a8a7c01027e0ff5c6b69634a96 (patch)
tree5d94daf9fc394425b03baf3915ffd03a0011e442
parent97fd40df4d85d91f3414dc88aa96222c1d827b49 (diff)
Parallel write
Speedup
-rw-r--r--README.md20
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rw-r--r--helpers.go14
-rw-r--r--indexes.go96
-rw-r--r--main.go306
-rw-r--r--models.go60
-rw-r--r--reader.go114
-rw-r--r--writer.go101
9 files changed, 394 insertions, 323 deletions
diff --git a/README.md b/README.md
index df0d88d..72eb5c8 100644
--- a/README.md
+++ b/README.md
@@ -8,14 +8,18 @@ Simple loader from osm dump file to mongodb. Based on https://github.com/paulmac
## Usage
-`./osm2go -osmfile PATH_TO_OSM_FILE [-dbconnection mongodb://localhost:27017] [-dbname osm] [-initial=true] [-concurrency=16] [-block=1000]`
-
-* `osmfile` required, path to *.osm or *.osm.pbf file
-* `dbconnection` optional, mongodb connection string (default: `mongodb://localhost:27017`)
-* `dbname` optional, mongodb database name (default: `osm`)
-* `initial` optional, use insert instead upsert. Faster, but not check if item exists (default: `false`)
-* `concurrency` optional, parallel read processes (default, `16`)
-* `block` optional, block size to bulk write (default: `1000`)
+`./osm2go -osmfile=PATH_TO_OSM_FILE`
+
+All flags:
+
+* `-osmfile` (required) OSM file
+* `-initial` (default:false) Is initial import (uses insert, not upsert)
+* `-indexes` (default:false) Create indexes (needs only first time)
+* `-dbconnection` (default:"mongodb://localhost:27017") Mongo database name
+* `-dbname` (default:"map") Mongo database name
+* `-layers` (default:"nodes,ways,relations") Layers to import
+* `-block` (default:1000) Block size to bulk write
+* `-concurrency` (default:32) Concurrency read and write
## Example
diff --git a/go.mod b/go.mod
index 06c3118..96aaa12 100644
--- a/go.mod
+++ b/go.mod
@@ -7,6 +7,8 @@ require (
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/go-cmp v0.3.0 // indirect
+ github.com/neonxp/rutina v0.4.3
+ github.com/paulmach/go.geojson v1.4.0
github.com/paulmach/orb v0.1.3
github.com/paulmach/osm v0.0.1
github.com/stretchr/testify v1.3.0 // indirect
diff --git a/go.sum b/go.sum
index 419da2d..744a494 100644
--- a/go.sum
+++ b/go.sum
@@ -10,6 +10,10 @@ github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/neonxp/rutina v0.4.3 h1:It7wu2L1FlPMC7UFGS7cTdMPWqd2hwL6+xP8UP72xZc=
+github.com/neonxp/rutina v0.4.3/go.mod h1:QJOHIcMI4Lh4Nyyi0v119KZllW1S5KxJyy/zg5KQXno=
+github.com/paulmach/go.geojson v1.4.0 h1:5x5moCkCtDo5x8af62P9IOAYGQcYHtxz2QJ3x1DoCgY=
+github.com/paulmach/go.geojson v1.4.0/go.mod h1:YaKx1hKpWF+T2oj2lFJPsW/t1Q5e1jQI61eoQSTwpIs=
github.com/paulmach/orb v0.1.3 h1:Wa1nzU269Zv7V9paVEY1COWW8FCqv4PC/KJRbJSimpM=
github.com/paulmach/orb v0.1.3/go.mod h1:VFlX/8C+IQ1p6FTRRKzKoOPJnvEtA5G0Veuqwbu//Vk=
github.com/paulmach/osm v0.0.1 h1:TxU/uZnJ+ssntblY6kSieOP4tQv31VW8wfyf4L3BeKs=
diff --git a/helpers.go b/helpers.go
new file mode 100644
index 0000000..b8672ac
--- /dev/null
+++ b/helpers.go
@@ -0,0 +1,14 @@
+package main
+
+import "github.com/paulmach/osm"
+
+func convertTags(tags osm.Tags) []Tag {
+ result := make([]Tag, 0, len(tags))
+ for _, t := range tags {
+ result = append(result, Tag{
+ Key: t.Key,
+ Value: t.Value,
+ })
+ }
+ return result
+}
diff --git a/indexes.go b/indexes.go
new file mode 100644
index 0000000..d8dbb32
--- /dev/null
+++ b/indexes.go
@@ -0,0 +1,96 @@
+package main
+
+import (
+ "context"
+ "log"
+
+ "go.mongodb.org/mongo-driver/mongo"
+ "go.mongodb.org/mongo-driver/mongo/options"
+ "go.mongodb.org/mongo-driver/x/bsonx"
+)
+
+func createIndexes(db *mongo.Database) error {
+ opts := options.CreateIndexes().SetMaxTime(1000)
+ nodes := db.Collection("nodes")
+ log.Println("creating indexes for nodes")
+ created, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
+ {
+ Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
+ Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
+ }, {
+ Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}},
+ Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
+ }, {
+ Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}},
+ Options: (options.Index()).SetBackground(true).SetSparse(true),
+ },
+ }, opts)
+ if err != nil {
+ return err
+ }
+ log.Println(created)
+ log.Println("creating geoindexes for nodes")
+ if err := geoIndex(nodes, "location"); err != nil {
+ return err
+ }
+
+ log.Println("creating indexes for ways")
+ ways := db.Collection("ways")
+ created, err = ways.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
+ {
+ Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
+ Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
+ }, {
+ Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}},
+ Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
+ }, {
+ Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}},
+ Options: (options.Index()).SetBackground(true).SetSparse(true),
+ },
+ }, opts)
+ if err != nil {
+ return err
+ }
+ log.Println(created)
+
+ relations := db.Collection("relations")
+ log.Println("creating geoindexes for relations")
+ created, err = relations.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
+ {
+ Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
+ Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
+ }, {
+ Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}},
+ Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
+ }, {
+ Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}},
+ Options: (options.Index()).SetBackground(true).SetSparse(true),
+ },
+ {
+ Keys: bsonx.Doc{{"members.ref", bsonx.Int32(-1)}},
+ Options: (options.Index()).SetBackground(true).SetSparse(true),
+ },
+ }, opts)
+ if err != nil {
+ return err
+ }
+ log.Println(created)
+ if err := geoIndex(relations, "members.coords"); err != nil {
+ return err
+ }
+ log.Println("indexes created")
+ return nil
+}
+
+func geoIndex(col *mongo.Collection, key string) error {
+ _, err := col.Indexes().CreateOne(
+ context.Background(),
+ mongo.IndexModel{
+ Keys: bsonx.Doc{{
+ Key: key, Value: bsonx.String("2dsphere"),
+ }},
+ Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true),
+ },
+ )
+ return err
+}
diff --git a/main.go b/main.go
index df5dcc1..e5dab33 100644
--- a/main.go
+++ b/main.go
@@ -4,15 +4,13 @@ import (
"context"
"flag"
"log"
- "os"
+ "strings"
"time"
- "github.com/paulmach/osm"
- "github.com/paulmach/osm/osmpbf"
- "go.mongodb.org/mongo-driver/bson"
+ "github.com/neonxp/rutina"
+ _ "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
- "go.mongodb.org/mongo-driver/x/bsonx"
)
func main() {
@@ -20,10 +18,13 @@ func main() {
dbname := flag.String("dbname", "map", "Mongo database name")
osmfile := flag.String("osmfile", "", "OSM file")
initial := flag.Bool("initial", false, "Is initial import")
- indexes := flag.Bool("indexes", false, "Just create indexes")
- concurrency := flag.Int("concurrency", 16, "Concurrency")
+ indexes := flag.Bool("indexes", false, "Create indexes")
+ layersString := flag.String("layers", "nodes,ways,relations", "Layers to import")
blockSize := flag.Int("block", 1000, "Block size to bulk write")
+ concurrency := flag.Int("concurrency", 32, "Concurrency read and write")
flag.Parse()
+ layers := strings.Split(*layersString, ",")
+ r := rutina.New()
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
client, err := mongo.Connect(ctx, options.Client().ApplyURI(*dbconnection))
if err != nil {
@@ -37,289 +38,24 @@ func main() {
log.Fatal(err)
}
log.Println("Indexes created")
- return
}
log.Printf("Started import file %s to db %s", *osmfile, *dbname)
-
- if err := read(db, *osmfile, *initial, *concurrency, *blockSize); err != nil {
- log.Fatal(err)
- }
-}
-
-func read(db *mongo.Database, file string, initial bool, concurrency int, blockSize int) error {
- nodes := db.Collection("nodes")
- ways := db.Collection("ways")
- relations := db.Collection("relations")
-
- f, err := os.Open(file)
- if err != nil {
- return err
- }
- defer f.Close()
-
- opts := (new(options.BulkWriteOptions)).SetOrdered(false)
- nc := 0
- wc := 0
- rc := 0
-
- scanner := osmpbf.New(context.Background(), f, concurrency)
- defer scanner.Close()
-
- bufferNodes := make([]mongo.WriteModel, 0, blockSize)
- bufferWays := make([]mongo.WriteModel, 0, blockSize)
- bufferRelations := make([]mongo.WriteModel, 0, blockSize)
- for scanner.Scan() {
- o := scanner.Object()
- switch o := o.(type) {
- case *osm.Way:
- nodes := make([]int64, 0, len(o.Nodes))
- for _, v := range o.Nodes {
- nodes = append(nodes, int64(v.ID))
- }
-
- w := Way{
- OsmID: int64(o.ID),
- Tags: convertTags(o.Tags),
- Nodes: nodes,
- Timestamp: o.Timestamp,
- Version: o.Version,
- Visible: o.Visible,
- }
- if initial {
- um := mongo.NewInsertOneModel()
- um.SetDocument(w)
- bufferWays = append(bufferWays, um)
- } else {
- um := mongo.NewUpdateOneModel()
- um.SetUpsert(true)
- um.SetUpdate(w)
- um.SetFilter(bson.M{"osm_id": w.OsmID})
- bufferWays = append(bufferWays, um)
- }
- wc++
- case *osm.Node:
- n := Node{
- OsmID: int64(o.ID),
- Location: Coords{
- Type: "Point",
- Coordinates: []float64{
- o.Lon,
- o.Lat,
- }},
- Tags: convertTags(o.Tags),
- Version: o.Version,
- Timestamp: o.Timestamp,
- Visible: o.Visible,
- }
- if initial {
- um := mongo.NewInsertOneModel()
- um.SetDocument(n)
- bufferNodes = append(bufferNodes, um)
- } else {
- um := mongo.NewUpdateOneModel()
- um.SetUpsert(true)
- um.SetUpdate(n)
- um.SetFilter(bson.M{"osm_id": n.OsmID})
- bufferNodes = append(bufferNodes, um)
- }
- nc++
- case *osm.Relation:
- members := make([]Member, len(o.Members))
- for _, v := range o.Members {
- members = append(members, Member{
- Type: v.Type,
- Version: v.Version,
- Orientation: v.Orientation,
- Ref: v.Ref,
- Role: v.Role,
- Location: Coords{
- Type: "Point",
- Coordinates: []float64{
- v.Lon,
- v.Lat,
- }},
- })
- }
- r := Relation{
- OsmID: int64(o.ID),
- Tags: convertTags(o.Tags),
- Version: o.Version,
- Timestamp: o.Timestamp,
- Visible: o.Visible,
- Members: members,
- }
- if initial {
- um := mongo.NewInsertOneModel()
- um.SetDocument(r)
- bufferRelations = append(bufferRelations, um)
- } else {
- um := mongo.NewUpdateOneModel()
- um.SetUpsert(true)
- um.SetUpdate(r)
- um.SetFilter(bson.M{"osm_id": r.OsmID})
- bufferRelations = append(bufferRelations, um)
- }
- rc++
- }
- if len(bufferNodes) == blockSize {
- if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil {
- return err
- }
- bufferNodes = make([]mongo.WriteModel, 0, blockSize)
- log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
- }
- if len(bufferWays) == blockSize {
- if _, err := nodes.BulkWrite(context.Background(), bufferWays, opts); err != nil {
- return err
- }
- bufferWays = make([]mongo.WriteModel, 0, blockSize)
- log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
- }
- if len(bufferRelations) == blockSize {
- if _, err := nodes.BulkWrite(context.Background(), bufferRelations, opts); err != nil {
- return err
- }
- bufferRelations = make([]mongo.WriteModel, 0, blockSize)
- log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
- }
- }
- if len(bufferNodes) != 0 {
- if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil {
- return err
- }
- log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
- }
- if len(bufferWays) != 0 {
- if _, err := ways.BulkWrite(context.Background(), bufferWays, opts); err != nil {
- return err
- }
- log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
- }
- if len(bufferRelations) != 0 {
- if _, err := relations.BulkWrite(context.Background(), bufferRelations, opts); err != nil {
- return err
- }
- log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
- }
- log.Println("Import done")
- scanErr := scanner.Err()
- if scanErr != nil {
- return scanErr
- }
- return nil
-}
-
-func createIndexes(db *mongo.Database) error {
- opts := options.CreateIndexes().SetMaxTime(1000)
- nodes := db.Collection("nodes")
- log.Println("creating indexes for nodes")
- created, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
- {
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
- },
- {
- Keys: bsonx.Doc{
- {"tags.key", bsonx.Int32(-1)},
- {"tags.value", bsonx.Int32(-1)},
- },
- Options: (options.Index()).SetBackground(true).SetSparse(true),
- },
- }, opts)
- if err != nil {
- return err
- }
- log.Println(created)
- log.Println("creating geoindexes for nodes")
- if err := geoIndex(nodes, "location"); err != nil {
- return err
- }
-
- log.Println("creating indexes for ways")
- ways := db.Collection("ways")
- created, err = ways.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
- {
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
- },
- {
- Keys: bsonx.Doc{
- {"tags.key", bsonx.Int32(-1)},
- {"tags.value", bsonx.Int32(-1)},
- },
- Options: (options.Index()).SetBackground(true).SetSparse(true),
- },
- }, opts)
- if err != nil {
- return err
- }
- log.Println(created)
- relations := db.Collection("relations")
- log.Println("creating geoindexes for relations")
- created, err = relations.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
- {
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
- },
- {
- Keys: bsonx.Doc{
- {"tags.key", bsonx.Int32(-1)},
- {"tags.value", bsonx.Int32(-1)},
- },
- Options: (options.Index()).SetBackground(true).SetSparse(true),
- },
- {
- Keys: bsonx.Doc{{"members.ref", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true),
- },
- }, opts)
- if err != nil {
- return err
- }
- log.Println(created)
- if err := geoIndex(relations, "members.coords"); err != nil {
- return err
- }
- log.Println("indexes created")
- return nil
-}
-
-func convertTags(tags osm.Tags) []Tag {
- result := make([]Tag, 0, len(tags))
- for _, t := range tags {
- result = append(result, Tag{
- Key: t.Key,
- Value: t.Value,
+ nodesCh := make(chan Node, 1)
+ waysCh := make(chan Way, 1)
+ relationsCh := make(chan Relation, 1)
+
+ for i := 0; i < *concurrency; i++ {
+ worker := i
+ r.Go(func(ctx context.Context) error {
+ return write(ctx, db, nodesCh, waysCh, relationsCh, *initial, *blockSize, worker)
})
}
- return result
-}
-func simpleIndex(col *mongo.Collection, keys []string, unique bool) error {
- idxKeys := bsonx.Doc{}
- for _, e := range keys {
- idxKeys.Append(e, bsonx.Int32(1))
+ r.Go(func(ctx context.Context) error {
+ return read(ctx, *osmfile, nodesCh, waysCh, relationsCh, *concurrency, layers)
+ })
+ if err := r.Wait(); err != nil {
+ log.Fatal(err)
}
- _, err := col.Indexes().CreateOne(
- context.Background(),
- mongo.IndexModel{
- Keys: idxKeys,
- Options: options.Index().SetUnique(unique).SetSparse(true).SetBackground(true),
- },
- )
- return err
-}
-
-func geoIndex(col *mongo.Collection, key string) error {
- _, err := col.Indexes().CreateOne(
- context.Background(),
- mongo.IndexModel{
- Keys: bsonx.Doc{{
- Key: key, Value: bsonx.Int32(1),
- }},
- Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true),
- },
- )
- return err
}
diff --git a/models.go b/models.go
index c98ed2f..dd187d2 100644
--- a/models.go
+++ b/models.go
@@ -9,54 +9,54 @@ import (
)
type Coords struct {
- Type string `bson:"type"`
- Coordinates []float64 `bson:"coordinates"`
+ Type string `json:"type" bson:"type"`
+ Coordinates []float64 `json:"coordinates" bson:"coordinates"`
}
type Node struct {
- ID primitive.ObjectID `bson:"_id,omitempty"`
- OsmID int64 `bson:"osm_id"`
- Visible bool `bson:"visible"`
- Version int `bson:"version,omitempty"`
- Timestamp time.Time `bson:"timestamp"`
- Tags []Tag `bson:"tags,omitempty"`
- Location Coords `bson:"location"`
+ ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"`
+ OsmID int64 `json:"osm_id" bson:"osm_id"`
+ Visible bool `json:"visible" bson:"visible"`
+ Version int `json:"version,omitempty" bson:"version,omitempty"`
+ Timestamp time.Time `json:"timestamp" bson:"timestamp"`
+ Tags []Tag `json:"tags,omitempty" bson:"tags,omitempty"`
+ Location Coords `json:"location" bson:"location"`
}
type Way struct {
- ID primitive.ObjectID `bson:"_id,omitempty"`
- OsmID int64 `bson:"osm_id"`
- Visible bool `bson:"visible"`
- Version int `bson:"version"`
- Timestamp time.Time `bson:"timestamp"`
- Nodes []int64 `bson:"nodes"`
- Tags []Tag `bson:"tags"`
+ ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"`
+ OsmID int64 `json:"osm_id" bson:"osm_id"`
+ Visible bool `json:"visible" bson:"visible"`
+ Version int `json:"version" bson:"version"`
+ Timestamp time.Time `json:"timestamp" bson:"timestamp"`
+ Nodes []int64 `json:"nodes" bson:"nodes"`
+ Tags []Tag `json:"tags" bson:"tags"`
}
type Relation struct {
- ID primitive.ObjectID `bson:"_id,omitempty"`
- OsmID int64 `bson:"osm_id"`
- Visible bool `bson:"visible"`
- Version int `bson:"version"`
- Timestamp time.Time `bson:"timestamp"`
- Members []Member `bson:"members"`
- Tags []Tag `bson:"tags"`
+ ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"`
+ OsmID int64 `json:"osm_id" bson:"osm_id"`
+ Visible bool `json:"visible" bson:"visible"`
+ Version int `json:"version" bson:"version"`
+ Timestamp time.Time `json:"timestamp" bson:"timestamp"`
+ Members []Member `json:"members" bson:"members"`
+ Tags []Tag `json:"tags" bson:"tags"`
}
type Member struct {
- Type osm.Type `bson:"type"`
- Ref int64 `bson:"ref"`
- Role string `bson:"role"`
+ Type osm.Type `json:"type" bson:"type"`
+ Ref int64 `json:"ref" bson:"ref"`
+ Role string `json:"role" bson:"role"`
Version int
- Location Coords `bson:"location"`
+ Location *Coords `json:"location,omitempty" bson:"location,omitempty"`
// Orientation is the direction of the way around a ring of a multipolygon.
// Only valid for multipolygon or boundary relations.
- Orientation orb.Orientation `bson:"orienation,omitempty"`
+ Orientation orb.Orientation `json:"orienation,omitempty" bson:"orienation,omitempty"`
}
type Tag struct {
- Key string `bson:"key"`
- Value string `bson:"value"`
+ Key string `json:"key" bson:"key"`
+ Value string `json:"value" bson:"value"`
}
diff --git a/reader.go b/reader.go
new file mode 100644
index 0000000..5ab603c
--- /dev/null
+++ b/reader.go
@@ -0,0 +1,114 @@
+package main
+
+import (
+ "context"
+ "log"
+ "os"
+
+ "github.com/paulmach/osm"
+ "github.com/paulmach/osm/osmpbf"
+)
+
+func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, concurrency int, layers []string) error {
+ f, err := os.Open(file)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ scanner := osmpbf.New(context.Background(), f, concurrency)
+ defer scanner.Close()
+
+ layersToImport := map[string]bool{
+ "ways": false,
+ "nodes": false,
+ "relations": false,
+ }
+
+ for _, l := range layers {
+ layersToImport[l] = true
+ }
+
+ for scanner.Scan() {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ o := scanner.Object()
+ switch o := o.(type) {
+ case *osm.Way:
+ if !layersToImport["ways"] {
+ continue
+ }
+ nodes := make([]int64, 0, len(o.Nodes))
+ for _, v := range o.Nodes {
+ nodes = append(nodes, int64(v.ID))
+ }
+
+ w := Way{
+ OsmID: int64(o.ID),
+ Tags: convertTags(o.Tags),
+ Nodes: nodes,
+ Timestamp: o.Timestamp,
+ Version: o.Version,
+ Visible: o.Visible,
+ }
+ waysCh <- w
+ case *osm.Node:
+ if !layersToImport["nodes"] {
+ continue
+ }
+ n := Node{
+ OsmID: int64(o.ID),
+ Location: Coords{
+ Type: "Point",
+ Coordinates: []float64{
+ o.Lon,
+ o.Lat,
+ }},
+ Tags: convertTags(o.Tags),
+ Version: o.Version,
+ Timestamp: o.Timestamp,
+ Visible: o.Visible,
+ }
+ nodesCh <- n
+ case *osm.Relation:
+ if !layersToImport["relations"] {
+ continue
+ }
+ members := make([]Member, 0, len(o.Members))
+ for _, v := range o.Members {
+ var location *Coords
+ if v.Lat != 0.0 && v.Lon != 0.0 {
+ location = &Coords{
+ Type: "Point",
+ Coordinates: []float64{
+ v.Lon,
+ v.Lat,
+ }}
+ }
+ members = append(members, Member{
+ Type: v.Type,
+ Version: v.Version,
+ Orientation: v.Orientation,
+ Ref: v.Ref,
+ Role: v.Role,
+ Location: location,
+ })
+ }
+ r := Relation{
+ OsmID: int64(o.ID),
+ Tags: convertTags(o.Tags),
+ Version: o.Version,
+ Timestamp: o.Timestamp,
+ Visible: o.Visible,
+ Members: members,
+ }
+ relationsCh <- r
+ }
+ }
+ log.Println("Read done")
+ scanErr := scanner.Err()
+ if scanErr != nil {
+ return scanErr
+ }
+ return nil
+}
diff --git a/writer.go b/writer.go
new file mode 100644
index 0000000..fe82656
--- /dev/null
+++ b/writer.go
@@ -0,0 +1,101 @@
+package main
+
+import (
+ "context"
+ "log"
+
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/mongo"
+ "go.mongodb.org/mongo-driver/mongo/options"
+)
+
+func write(ctx context.Context, db *mongo.Database, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, initial bool, blockSize int, worker int) error {
+ nodes := db.Collection("nodes")
+ ways := db.Collection("ways")
+ relations := db.Collection("relations")
+ opts := (new(options.BulkWriteOptions)).SetOrdered(false)
+ nodesBuffer := make([]mongo.WriteModel, 0, blockSize)
+ waysBuffer := make([]mongo.WriteModel, 0, blockSize)
+ relationsBuffer := make([]mongo.WriteModel, 0, blockSize)
+ nc := 0
+ wc := 0
+ rc := 0
+ for {
+ select {
+ case w := <-waysCh:
+ if initial {
+ um := mongo.NewInsertOneModel()
+ um.SetDocument(w)
+ waysBuffer = append(waysBuffer, um)
+ } else {
+ um := mongo.NewUpdateOneModel()
+ um.SetUpsert(true)
+ um.SetUpdate(w)
+ um.SetFilter(bson.M{"osm_id": w.OsmID})
+ waysBuffer = append(waysBuffer, um)
+ }
+
+ case n := <-nodesCh:
+ if initial {
+ um := mongo.NewInsertOneModel()
+ um.SetDocument(n)
+ nodesBuffer = append(nodesBuffer, um)
+ } else {
+ um := mongo.NewUpdateOneModel()
+ um.SetUpsert(true)
+ um.SetUpdate(n)
+ um.SetFilter(bson.M{"osm_id": n.OsmID})
+ nodesBuffer = append(nodesBuffer, um)
+ }
+ case r := <-relationsCh:
+ if initial {
+ um := mongo.NewInsertOneModel()
+ um.SetDocument(r)
+ relationsBuffer = append(relationsBuffer, um)
+ } else {
+ um := mongo.NewUpdateOneModel()
+ um.SetUpsert(true)
+ um.SetUpdate(r)
+ um.SetFilter(bson.M{"osm_id": r.OsmID})
+ relationsBuffer = append(relationsBuffer, um)
+ }
+ case <-ctx.Done():
+ log.Printf("[%d] saving last info in buffers...", worker)
+ if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil {
+ return err
+ }
+ if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil {
+ return err
+ }
+ if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil {
+ return err
+ }
+ log.Printf("[%d] Done", worker)
+ return nil
+ }
+ if len(nodesBuffer) == blockSize {
+ nc++
+ log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
+ if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil {
+ return err
+ }
+ nodesBuffer = make([]mongo.WriteModel, 0)
+ }
+ if len(waysBuffer) == blockSize {
+ wc++
+ log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
+ if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil {
+ return err
+ }
+ waysBuffer = make([]mongo.WriteModel, 0)
+ }
+ if len(relationsBuffer) == blockSize {
+ rc++
+ log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
+ if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil {
+ return err
+ }
+ relationsBuffer = make([]mongo.WriteModel, 0)
+ }
+ }
+}