aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander NeonXP Kiryukhin <a.kiryukhin@mail.ru>2019-06-03 13:59:49 +0300
committerAlexander NeonXP Kiryukhin <a.kiryukhin@mail.ru>2019-06-03 13:59:49 +0300
commitc36227104708bf48c95d2b61ba9176370eb0baf5 (patch)
tree47e0a0373956556955f1dcfb8f00421a6c714025
parent2f4b6e90597784a8a7c01027e0ff5c6b69634a96 (diff)
Store objects on single collections (overpass like json)HEADmaster
Filter deleted objects
-rw-r--r--README.md22
-rw-r--r--indexes.go58
-rw-r--r--main.go19
-rw-r--r--models.go49
-rw-r--r--reader.go39
-rw-r--r--writer.go89
6 files changed, 78 insertions, 198 deletions
diff --git a/README.md b/README.md
index 72eb5c8..8f5f3d3 100644
--- a/README.md
+++ b/README.md
@@ -4,22 +4,22 @@ Simple loader from osm dump file to mongodb. Based on https://github.com/paulmac
## Build
-`go build -o osm2go`
+`go build -o osm2mgo`
## Usage
-`./osm2go -osmfile=PATH_TO_OSM_FILE`
+`./osm2mgo flags`
-All flags:
+### Flags:
-* `-osmfile` (required) OSM file
-* `-initial` (default:false) Is initial import (uses insert, not upsert)
-* `-indexes` (default:false) Create indexes (needs only first time)
-* `-dbconnection` (default:"mongodb://localhost:27017") Mongo database name
-* `-dbname` (default:"map") Mongo database name
-* `-layers` (default:"nodes,ways,relations") Layers to import
-* `-block` (default:1000) Block size to bulk write
-* `-concurrency` (default:32) Concurrency read and write
+* `-osmfile string` Path to OSM file (PBF format only) (default "./RU.osm.pbf")
+* `-dbconnection string` Mongo database name (default "mongodb://localhost:27017")
+* `-dbname string` Mongo database name (default "map")
+* `-initial` Is initial import?
+* `-indexes` Create indexes
+* `-layers string` Layers to import (default "nodes,ways,relations")
+* `-concurrency int` Workers count (default 32)
+* `-block int` Block size to bulk write (default 1000)
## Example
diff --git a/indexes.go b/indexes.go
index d8dbb32..fb07886 100644
--- a/indexes.go
+++ b/indexes.go
@@ -2,7 +2,6 @@ package main
import (
"context"
- "log"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
@@ -11,58 +10,9 @@ import (
func createIndexes(db *mongo.Database) error {
opts := options.CreateIndexes().SetMaxTime(1000)
- nodes := db.Collection("nodes")
- log.Println("creating indexes for nodes")
- created, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
+ nodes := db.Collection("objects")
+ _, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
- }, {
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
- }, {
- Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true),
- },
- }, opts)
- if err != nil {
- return err
- }
- log.Println(created)
- log.Println("creating geoindexes for nodes")
- if err := geoIndex(nodes, "location"); err != nil {
- return err
- }
-
- log.Println("creating indexes for ways")
- ways := db.Collection("ways")
- created, err = ways.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
- {
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
- }, {
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
- }, {
- Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true),
- },
- }, opts)
- if err != nil {
- return err
- }
- log.Println(created)
-
- relations := db.Collection("relations")
- log.Println("creating geoindexes for relations")
- created, err = relations.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
- {
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
- }, {
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
- }, {
Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
@@ -74,11 +24,9 @@ func createIndexes(db *mongo.Database) error {
if err != nil {
return err
}
- log.Println(created)
- if err := geoIndex(relations, "members.coords"); err != nil {
+ if err := geoIndex(nodes, "location"); err != nil {
return err
}
- log.Println("indexes created")
return nil
}
diff --git a/main.go b/main.go
index e5dab33..f975187 100644
--- a/main.go
+++ b/main.go
@@ -16,12 +16,12 @@ import (
func main() {
dbconnection := flag.String("dbconnection", "mongodb://localhost:27017", "Mongo database name")
dbname := flag.String("dbname", "map", "Mongo database name")
- osmfile := flag.String("osmfile", "", "OSM file")
- initial := flag.Bool("initial", false, "Is initial import")
+ osmfile := flag.String("osmfile", "./RU.osm.pbf", "Path to OSM file (PBF format only)")
+ initial := flag.Bool("initial", false, "Is initial import?")
indexes := flag.Bool("indexes", false, "Create indexes")
layersString := flag.String("layers", "nodes,ways,relations", "Layers to import")
blockSize := flag.Int("block", 1000, "Block size to bulk write")
- concurrency := flag.Int("concurrency", 32, "Concurrency read and write")
+ concurrency := flag.Int("concurrency", 32, "Workers count")
flag.Parse()
layers := strings.Split(*layersString, ",")
r := rutina.New()
@@ -34,26 +34,25 @@ func main() {
db := client.Database(*dbname)
if *indexes {
+ log.Println("Creating indexes...")
if err := createIndexes(db); err != nil {
log.Fatal(err)
}
- log.Println("Indexes created")
+ log.Println("Done!")
}
- log.Printf("Started import file %s to db %s", *osmfile, *dbname)
- nodesCh := make(chan Node, 1)
- waysCh := make(chan Way, 1)
- relationsCh := make(chan Relation, 1)
+ log.Printf("Started import file %s to db %s (%d workers)", *osmfile, *dbname, *concurrency)
+ insertCh := make(chan Object, 1)
for i := 0; i < *concurrency; i++ {
worker := i
r.Go(func(ctx context.Context) error {
- return write(ctx, db, nodesCh, waysCh, relationsCh, *initial, *blockSize, worker)
+ return write(ctx, db, insertCh, *initial, *blockSize, worker)
})
}
r.Go(func(ctx context.Context) error {
- return read(ctx, *osmfile, nodesCh, waysCh, relationsCh, *concurrency, layers)
+ return read(ctx, *osmfile, insertCh, *concurrency, layers)
})
if err := r.Wait(); err != nil {
log.Fatal(err)
diff --git a/models.go b/models.go
index dd187d2..beee761 100644
--- a/models.go
+++ b/models.go
@@ -5,7 +5,6 @@ import (
"github.com/paulmach/orb"
"github.com/paulmach/osm"
- "go.mongodb.org/mongo-driver/bson/primitive"
)
type Coords struct {
@@ -13,34 +12,27 @@ type Coords struct {
Coordinates []float64 `json:"coordinates" bson:"coordinates"`
}
-type Node struct {
- ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"`
- OsmID int64 `json:"osm_id" bson:"osm_id"`
- Visible bool `json:"visible" bson:"visible"`
- Version int `json:"version,omitempty" bson:"version,omitempty"`
- Timestamp time.Time `json:"timestamp" bson:"timestamp"`
- Tags []Tag `json:"tags,omitempty" bson:"tags,omitempty"`
- Location Coords `json:"location" bson:"location"`
-}
+type ItemType string
+
+const (
+ NodeType ItemType = "node"
+ WayType ItemType = "way"
+ RelationType ItemType = "relation"
+)
-type Way struct {
- ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"`
- OsmID int64 `json:"osm_id" bson:"osm_id"`
- Visible bool `json:"visible" bson:"visible"`
- Version int `json:"version" bson:"version"`
- Timestamp time.Time `json:"timestamp" bson:"timestamp"`
- Nodes []int64 `json:"nodes" bson:"nodes"`
- Tags []Tag `json:"tags" bson:"tags"`
+type ID struct {
+ ID int64 `json:"id" bson:"id"`
+ Type ItemType `json:"type" bson:"type"`
+ Version int `json:"version" bson:"version"`
}
-type Relation struct {
- ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"`
- OsmID int64 `json:"osm_id" bson:"osm_id"`
- Visible bool `json:"visible" bson:"visible"`
- Version int `json:"version" bson:"version"`
- Timestamp time.Time `json:"timestamp" bson:"timestamp"`
- Members []Member `json:"members" bson:"members"`
- Tags []Tag `json:"tags" bson:"tags"`
+type Object struct {
+ ID ID `json:"_id" bson:"_id"`
+ Timestamp time.Time `json:"timestamp" bson:"timestamp"`
+ Tags []Tag `json:"tags" bson:"tags"`
+ Location Coords `json:"location,omitempty" bson:"location,omitempty"`
+ Nodes []int64 `json:"nodes,omitempty" bson:"nodes,omitempty"`
+ Members []Member `json:"members,omitempty" bson:"members,omitempty"`
}
type Member struct {
@@ -48,12 +40,11 @@ type Member struct {
Ref int64 `json:"ref" bson:"ref"`
Role string `json:"role" bson:"role"`
- Version int
- Location *Coords `json:"location,omitempty" bson:"location,omitempty"`
+ Location *Coords `json:"location" bson:"location"`
// Orientation is the direction of the way around a ring of a multipolygon.
// Only valid for multipolygon or boundary relations.
- Orientation orb.Orientation `json:"orienation,omitempty" bson:"orienation,omitempty"`
+ Orientation orb.Orientation `json:"orienation" bson:"orienation"`
}
type Tag struct {
diff --git a/reader.go b/reader.go
index 5ab603c..c273d3f 100644
--- a/reader.go
+++ b/reader.go
@@ -9,7 +9,7 @@ import (
"github.com/paulmach/osm/osmpbf"
)
-func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, concurrency int, layers []string) error {
+func read(ctx context.Context, file string, insertCh chan Object, concurrency int, layers []string) error {
f, err := os.Open(file)
if err != nil {
return err
@@ -35,7 +35,7 @@ func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way,
o := scanner.Object()
switch o := o.(type) {
case *osm.Way:
- if !layersToImport["ways"] {
+ if !layersToImport["ways"] || !o.Visible {
continue
}
nodes := make([]int64, 0, len(o.Nodes))
@@ -43,35 +43,31 @@ func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way,
nodes = append(nodes, int64(v.ID))
}
- w := Way{
- OsmID: int64(o.ID),
+ w := Object{
+ ID: ID{ID: int64(o.ID), Type: WayType, Version: o.Version},
Tags: convertTags(o.Tags),
- Nodes: nodes,
Timestamp: o.Timestamp,
- Version: o.Version,
- Visible: o.Visible,
+ Nodes: nodes,
}
- waysCh <- w
+ insertCh <- w
case *osm.Node:
- if !layersToImport["nodes"] {
+ if !layersToImport["nodes"] || !o.Visible {
continue
}
- n := Node{
- OsmID: int64(o.ID),
+ w := Object{
+ ID: ID{ID: int64(o.ID), Type: NodeType, Version: o.Version},
+ Tags: convertTags(o.Tags),
+ Timestamp: o.Timestamp,
Location: Coords{
Type: "Point",
Coordinates: []float64{
o.Lon,
o.Lat,
}},
- Tags: convertTags(o.Tags),
- Version: o.Version,
- Timestamp: o.Timestamp,
- Visible: o.Visible,
}
- nodesCh <- n
+ insertCh <- w
case *osm.Relation:
- if !layersToImport["relations"] {
+ if !layersToImport["relations"] || !o.Visible {
continue
}
members := make([]Member, 0, len(o.Members))
@@ -87,22 +83,19 @@ func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way,
}
members = append(members, Member{
Type: v.Type,
- Version: v.Version,
Orientation: v.Orientation,
Ref: v.Ref,
Role: v.Role,
Location: location,
})
}
- r := Relation{
- OsmID: int64(o.ID),
+ w := Object{
+ ID: ID{ID: int64(o.ID), Type: RelationType, Version: o.Version},
Tags: convertTags(o.Tags),
- Version: o.Version,
Timestamp: o.Timestamp,
- Visible: o.Visible,
Members: members,
}
- relationsCh <- r
+ insertCh <- w
}
}
log.Println("Read done")
diff --git a/writer.go b/writer.go
index fe82656..8322929 100644
--- a/writer.go
+++ b/writer.go
@@ -9,93 +9,42 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)
-func write(ctx context.Context, db *mongo.Database, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, initial bool, blockSize int, worker int) error {
- nodes := db.Collection("nodes")
- ways := db.Collection("ways")
- relations := db.Collection("relations")
+func write(ctx context.Context, db *mongo.Database, insertCh chan Object, initial bool, blockSize int, worker int) error {
+ nodes := db.Collection("items")
opts := (new(options.BulkWriteOptions)).SetOrdered(false)
- nodesBuffer := make([]mongo.WriteModel, 0, blockSize)
- waysBuffer := make([]mongo.WriteModel, 0, blockSize)
- relationsBuffer := make([]mongo.WriteModel, 0, blockSize)
- nc := 0
- wc := 0
- rc := 0
+ buf := make([]mongo.WriteModel, 0, blockSize)
+ ic := 0
for {
select {
- case w := <-waysCh:
+ case w := <-insertCh:
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(w)
- waysBuffer = append(waysBuffer, um)
+ buf = append(buf, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(w)
- um.SetFilter(bson.M{"osm_id": w.OsmID})
- waysBuffer = append(waysBuffer, um)
- }
-
- case n := <-nodesCh:
- if initial {
- um := mongo.NewInsertOneModel()
- um.SetDocument(n)
- nodesBuffer = append(nodesBuffer, um)
- } else {
- um := mongo.NewUpdateOneModel()
- um.SetUpsert(true)
- um.SetUpdate(n)
- um.SetFilter(bson.M{"osm_id": n.OsmID})
- nodesBuffer = append(nodesBuffer, um)
- }
- case r := <-relationsCh:
- if initial {
- um := mongo.NewInsertOneModel()
- um.SetDocument(r)
- relationsBuffer = append(relationsBuffer, um)
- } else {
- um := mongo.NewUpdateOneModel()
- um.SetUpsert(true)
- um.SetUpdate(r)
- um.SetFilter(bson.M{"osm_id": r.OsmID})
- relationsBuffer = append(relationsBuffer, um)
+ um.SetFilter(bson.M{"osm_id": w.ID})
+ buf = append(buf, um)
}
case <-ctx.Done():
- log.Printf("[%d] saving last info in buffers...", worker)
- if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil {
- return err
- }
- if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil {
- return err
- }
- if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil {
- return err
+ if len(buf) > 0 {
+ log.Printf("Worker: %d\tSaving last info in buffers...", worker)
+ if _, err := nodes.BulkWrite(context.Background(), buf, opts); err != nil {
+ return err
+ }
}
- log.Printf("[%d] Done", worker)
+ log.Printf("Worker: %d\tDone", worker)
return nil
}
- if len(nodesBuffer) == blockSize {
- nc++
- log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
- if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil {
- return err
- }
- nodesBuffer = make([]mongo.WriteModel, 0)
- }
- if len(waysBuffer) == blockSize {
- wc++
- log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
- if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil {
- return err
- }
- waysBuffer = make([]mongo.WriteModel, 0)
- }
- if len(relationsBuffer) == blockSize {
- rc++
- log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
- if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil {
+ if len(buf) == blockSize {
+ ic++
+ log.Printf("Worker: %d\tWriting block %d (%d objects)", worker, ic, ic*blockSize)
+ if _, err := nodes.BulkWrite(context.Background(), buf, opts); err != nil {
return err
}
- relationsBuffer = make([]mongo.WriteModel, 0)
+ buf = make([]mongo.WriteModel, 0)
}
}
}