From c36227104708bf48c95d2b61ba9176370eb0baf5 Mon Sep 17 00:00:00 2001 From: Alexander NeonXP Kiryukhin Date: Mon, 3 Jun 2019 13:59:49 +0300 Subject: Store objects on single collections (overpass like json) Filter deleted objects --- README.md | 22 ++++++++-------- indexes.go | 58 +++------------------------------------- main.go | 19 +++++++------- models.go | 49 ++++++++++++++-------------------- reader.go | 39 +++++++++++---------------- writer.go | 89 ++++++++++++++------------------------------------------------ 6 files changed, 78 insertions(+), 198 deletions(-) diff --git a/README.md b/README.md index 72eb5c8..8f5f3d3 100644 --- a/README.md +++ b/README.md @@ -4,22 +4,22 @@ Simple loader from osm dump file to mongodb. Based on https://github.com/paulmac ## Build -`go build -o osm2go` +`go build -o osm2mgo` ## Usage -`./osm2go -osmfile=PATH_TO_OSM_FILE` +`./osm2mgo flags` -All flags: +### Flags: -* `-osmfile` (required) OSM file -* `-initial` (default:false) Is initial import (uses insert, not upsert) -* `-indexes` (default:false) Create indexes (needs only first time) -* `-dbconnection` (default:"mongodb://localhost:27017") Mongo database name -* `-dbname` (default:"map") Mongo database name -* `-layers` (default:"nodes,ways,relations") Layers to import -* `-block` (default:1000) Block size to bulk write -* `-concurrency` (default:32) Concurrency read and write +* `-osmfile string` Path to OSM file (PBF format only) (default "./RU.osm.pbf") +* `-dbconnection string` Mongo database name (default "mongodb://localhost:27017") +* `-dbname string` Mongo database name (default "map") +* `-initial` Is initial import? +* `-indexes` Create indexes +* `-layers string` Layers to import (default "nodes,ways,relations") +* `-concurrency int` Workers count (default 32) +* `-block int` Block size to bulk write (default 1000) ## Example diff --git a/indexes.go b/indexes.go index d8dbb32..fb07886 100644 --- a/indexes.go +++ b/indexes.go @@ -2,7 +2,6 @@ package main import ( "context" - "log" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -11,58 +10,9 @@ import ( func createIndexes(db *mongo.Database) error { opts := options.CreateIndexes().SetMaxTime(1000) - nodes := db.Collection("nodes") - log.Println("creating indexes for nodes") - created, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + nodes := db.Collection("objects") + _, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ { - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), - }, { - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), - }, { - Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true), - }, - }, opts) - if err != nil { - return err - } - log.Println(created) - log.Println("creating geoindexes for nodes") - if err := geoIndex(nodes, "location"); err != nil { - return err - } - - log.Println("creating indexes for ways") - ways := db.Collection("ways") - created, err = ways.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ - { - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), - }, { - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), - }, { - Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true), - }, - }, opts) - if err != nil { - return err - } - log.Println(created) - - relations := db.Collection("relations") - log.Println("creating geoindexes for relations") - created, err = relations.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ - { - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), - }, { - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), - }, { Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}}, Options: (options.Index()).SetBackground(true).SetSparse(true), }, @@ -74,11 +24,9 @@ func createIndexes(db *mongo.Database) error { if err != nil { return err } - log.Println(created) - if err := geoIndex(relations, "members.coords"); err != nil { + if err := geoIndex(nodes, "location"); err != nil { return err } - log.Println("indexes created") return nil } diff --git a/main.go b/main.go index e5dab33..f975187 100644 --- a/main.go +++ b/main.go @@ -16,12 +16,12 @@ import ( func main() { dbconnection := flag.String("dbconnection", "mongodb://localhost:27017", "Mongo database name") dbname := flag.String("dbname", "map", "Mongo database name") - osmfile := flag.String("osmfile", "", "OSM file") - initial := flag.Bool("initial", false, "Is initial import") + osmfile := flag.String("osmfile", "./RU.osm.pbf", "Path to OSM file (PBF format only)") + initial := flag.Bool("initial", false, "Is initial import?") indexes := flag.Bool("indexes", false, "Create indexes") layersString := flag.String("layers", "nodes,ways,relations", "Layers to import") blockSize := flag.Int("block", 1000, "Block size to bulk write") - concurrency := flag.Int("concurrency", 32, "Concurrency read and write") + concurrency := flag.Int("concurrency", 32, "Workers count") flag.Parse() layers := strings.Split(*layersString, ",") r := rutina.New() @@ -34,26 +34,25 @@ func main() { db := client.Database(*dbname) if *indexes { + log.Println("Creating indexes...") if err := createIndexes(db); err != nil { log.Fatal(err) } - log.Println("Indexes created") + log.Println("Done!") } - log.Printf("Started import file %s to db %s", *osmfile, *dbname) - nodesCh := make(chan Node, 1) - waysCh := make(chan Way, 1) - relationsCh := make(chan Relation, 1) + log.Printf("Started import file %s to db %s (%d workers)", *osmfile, *dbname, *concurrency) + insertCh := make(chan Object, 1) for i := 0; i < *concurrency; i++ { worker := i r.Go(func(ctx context.Context) error { - return write(ctx, db, nodesCh, waysCh, relationsCh, *initial, *blockSize, worker) + return write(ctx, db, insertCh, *initial, *blockSize, worker) }) } r.Go(func(ctx context.Context) error { - return read(ctx, *osmfile, nodesCh, waysCh, relationsCh, *concurrency, layers) + return read(ctx, *osmfile, insertCh, *concurrency, layers) }) if err := r.Wait(); err != nil { log.Fatal(err) diff --git a/models.go b/models.go index dd187d2..beee761 100644 --- a/models.go +++ b/models.go @@ -5,7 +5,6 @@ import ( "github.com/paulmach/orb" "github.com/paulmach/osm" - "go.mongodb.org/mongo-driver/bson/primitive" ) type Coords struct { @@ -13,34 +12,27 @@ type Coords struct { Coordinates []float64 `json:"coordinates" bson:"coordinates"` } -type Node struct { - ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"` - OsmID int64 `json:"osm_id" bson:"osm_id"` - Visible bool `json:"visible" bson:"visible"` - Version int `json:"version,omitempty" bson:"version,omitempty"` - Timestamp time.Time `json:"timestamp" bson:"timestamp"` - Tags []Tag `json:"tags,omitempty" bson:"tags,omitempty"` - Location Coords `json:"location" bson:"location"` -} +type ItemType string + +const ( + NodeType ItemType = "node" + WayType ItemType = "way" + RelationType ItemType = "relation" +) -type Way struct { - ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"` - OsmID int64 `json:"osm_id" bson:"osm_id"` - Visible bool `json:"visible" bson:"visible"` - Version int `json:"version" bson:"version"` - Timestamp time.Time `json:"timestamp" bson:"timestamp"` - Nodes []int64 `json:"nodes" bson:"nodes"` - Tags []Tag `json:"tags" bson:"tags"` +type ID struct { + ID int64 `json:"id" bson:"id"` + Type ItemType `json:"type" bson:"type"` + Version int `json:"version" bson:"version"` } -type Relation struct { - ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"` - OsmID int64 `json:"osm_id" bson:"osm_id"` - Visible bool `json:"visible" bson:"visible"` - Version int `json:"version" bson:"version"` - Timestamp time.Time `json:"timestamp" bson:"timestamp"` - Members []Member `json:"members" bson:"members"` - Tags []Tag `json:"tags" bson:"tags"` +type Object struct { + ID ID `json:"_id" bson:"_id"` + Timestamp time.Time `json:"timestamp" bson:"timestamp"` + Tags []Tag `json:"tags" bson:"tags"` + Location Coords `json:"location,omitempty" bson:"location,omitempty"` + Nodes []int64 `json:"nodes,omitempty" bson:"nodes,omitempty"` + Members []Member `json:"members,omitempty" bson:"members,omitempty"` } type Member struct { @@ -48,12 +40,11 @@ type Member struct { Ref int64 `json:"ref" bson:"ref"` Role string `json:"role" bson:"role"` - Version int - Location *Coords `json:"location,omitempty" bson:"location,omitempty"` + Location *Coords `json:"location" bson:"location"` // Orientation is the direction of the way around a ring of a multipolygon. // Only valid for multipolygon or boundary relations. - Orientation orb.Orientation `json:"orienation,omitempty" bson:"orienation,omitempty"` + Orientation orb.Orientation `json:"orienation" bson:"orienation"` } type Tag struct { diff --git a/reader.go b/reader.go index 5ab603c..c273d3f 100644 --- a/reader.go +++ b/reader.go @@ -9,7 +9,7 @@ import ( "github.com/paulmach/osm/osmpbf" ) -func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, concurrency int, layers []string) error { +func read(ctx context.Context, file string, insertCh chan Object, concurrency int, layers []string) error { f, err := os.Open(file) if err != nil { return err @@ -35,7 +35,7 @@ func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way, o := scanner.Object() switch o := o.(type) { case *osm.Way: - if !layersToImport["ways"] { + if !layersToImport["ways"] || !o.Visible { continue } nodes := make([]int64, 0, len(o.Nodes)) @@ -43,35 +43,31 @@ func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way, nodes = append(nodes, int64(v.ID)) } - w := Way{ - OsmID: int64(o.ID), + w := Object{ + ID: ID{ID: int64(o.ID), Type: WayType, Version: o.Version}, Tags: convertTags(o.Tags), - Nodes: nodes, Timestamp: o.Timestamp, - Version: o.Version, - Visible: o.Visible, + Nodes: nodes, } - waysCh <- w + insertCh <- w case *osm.Node: - if !layersToImport["nodes"] { + if !layersToImport["nodes"] || !o.Visible { continue } - n := Node{ - OsmID: int64(o.ID), + w := Object{ + ID: ID{ID: int64(o.ID), Type: NodeType, Version: o.Version}, + Tags: convertTags(o.Tags), + Timestamp: o.Timestamp, Location: Coords{ Type: "Point", Coordinates: []float64{ o.Lon, o.Lat, }}, - Tags: convertTags(o.Tags), - Version: o.Version, - Timestamp: o.Timestamp, - Visible: o.Visible, } - nodesCh <- n + insertCh <- w case *osm.Relation: - if !layersToImport["relations"] { + if !layersToImport["relations"] || !o.Visible { continue } members := make([]Member, 0, len(o.Members)) @@ -87,22 +83,19 @@ func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way, } members = append(members, Member{ Type: v.Type, - Version: v.Version, Orientation: v.Orientation, Ref: v.Ref, Role: v.Role, Location: location, }) } - r := Relation{ - OsmID: int64(o.ID), + w := Object{ + ID: ID{ID: int64(o.ID), Type: RelationType, Version: o.Version}, Tags: convertTags(o.Tags), - Version: o.Version, Timestamp: o.Timestamp, - Visible: o.Visible, Members: members, } - relationsCh <- r + insertCh <- w } } log.Println("Read done") diff --git a/writer.go b/writer.go index fe82656..8322929 100644 --- a/writer.go +++ b/writer.go @@ -9,93 +9,42 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) -func write(ctx context.Context, db *mongo.Database, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, initial bool, blockSize int, worker int) error { - nodes := db.Collection("nodes") - ways := db.Collection("ways") - relations := db.Collection("relations") +func write(ctx context.Context, db *mongo.Database, insertCh chan Object, initial bool, blockSize int, worker int) error { + nodes := db.Collection("items") opts := (new(options.BulkWriteOptions)).SetOrdered(false) - nodesBuffer := make([]mongo.WriteModel, 0, blockSize) - waysBuffer := make([]mongo.WriteModel, 0, blockSize) - relationsBuffer := make([]mongo.WriteModel, 0, blockSize) - nc := 0 - wc := 0 - rc := 0 + buf := make([]mongo.WriteModel, 0, blockSize) + ic := 0 for { select { - case w := <-waysCh: + case w := <-insertCh: if initial { um := mongo.NewInsertOneModel() um.SetDocument(w) - waysBuffer = append(waysBuffer, um) + buf = append(buf, um) } else { um := mongo.NewUpdateOneModel() um.SetUpsert(true) um.SetUpdate(w) - um.SetFilter(bson.M{"osm_id": w.OsmID}) - waysBuffer = append(waysBuffer, um) - } - - case n := <-nodesCh: - if initial { - um := mongo.NewInsertOneModel() - um.SetDocument(n) - nodesBuffer = append(nodesBuffer, um) - } else { - um := mongo.NewUpdateOneModel() - um.SetUpsert(true) - um.SetUpdate(n) - um.SetFilter(bson.M{"osm_id": n.OsmID}) - nodesBuffer = append(nodesBuffer, um) - } - case r := <-relationsCh: - if initial { - um := mongo.NewInsertOneModel() - um.SetDocument(r) - relationsBuffer = append(relationsBuffer, um) - } else { - um := mongo.NewUpdateOneModel() - um.SetUpsert(true) - um.SetUpdate(r) - um.SetFilter(bson.M{"osm_id": r.OsmID}) - relationsBuffer = append(relationsBuffer, um) + um.SetFilter(bson.M{"osm_id": w.ID}) + buf = append(buf, um) } case <-ctx.Done(): - log.Printf("[%d] saving last info in buffers...", worker) - if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil { - return err - } - if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil { - return err - } - if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil { - return err + if len(buf) > 0 { + log.Printf("Worker: %d\tSaving last info in buffers...", worker) + if _, err := nodes.BulkWrite(context.Background(), buf, opts); err != nil { + return err + } } - log.Printf("[%d] Done", worker) + log.Printf("Worker: %d\tDone", worker) return nil } - if len(nodesBuffer) == blockSize { - nc++ - log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc) - if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil { - return err - } - nodesBuffer = make([]mongo.WriteModel, 0) - } - if len(waysBuffer) == blockSize { - wc++ - log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc) - if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil { - return err - } - waysBuffer = make([]mongo.WriteModel, 0) - } - if len(relationsBuffer) == blockSize { - rc++ - log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc) - if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil { + if len(buf) == blockSize { + ic++ + log.Printf("Worker: %d\tWriting block %d (%d objects)", worker, ic, ic*blockSize) + if _, err := nodes.BulkWrite(context.Background(), buf, opts); err != nil { return err } - relationsBuffer = make([]mongo.WriteModel, 0) + buf = make([]mongo.WriteModel, 0) } } } -- cgit v1.2.3