aboutsummaryrefslogtreecommitdiff
path: root/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'main.go')
-rw-r--r--main.go306
1 files changed, 21 insertions, 285 deletions
diff --git a/main.go b/main.go
index df5dcc1..e5dab33 100644
--- a/main.go
+++ b/main.go
@@ -4,15 +4,13 @@ import (
"context"
"flag"
"log"
- "os"
+ "strings"
"time"
- "github.com/paulmach/osm"
- "github.com/paulmach/osm/osmpbf"
- "go.mongodb.org/mongo-driver/bson"
+ "github.com/neonxp/rutina"
+ _ "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
- "go.mongodb.org/mongo-driver/x/bsonx"
)
func main() {
@@ -20,10 +18,13 @@ func main() {
dbname := flag.String("dbname", "map", "Mongo database name")
osmfile := flag.String("osmfile", "", "OSM file")
initial := flag.Bool("initial", false, "Is initial import")
- indexes := flag.Bool("indexes", false, "Just create indexes")
- concurrency := flag.Int("concurrency", 16, "Concurrency")
+ indexes := flag.Bool("indexes", false, "Create indexes")
+ layersString := flag.String("layers", "nodes,ways,relations", "Layers to import")
blockSize := flag.Int("block", 1000, "Block size to bulk write")
+ concurrency := flag.Int("concurrency", 32, "Concurrency read and write")
flag.Parse()
+ layers := strings.Split(*layersString, ",")
+ r := rutina.New()
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
client, err := mongo.Connect(ctx, options.Client().ApplyURI(*dbconnection))
if err != nil {
@@ -37,289 +38,24 @@ func main() {
log.Fatal(err)
}
log.Println("Indexes created")
- return
}
log.Printf("Started import file %s to db %s", *osmfile, *dbname)
-
- if err := read(db, *osmfile, *initial, *concurrency, *blockSize); err != nil {
- log.Fatal(err)
- }
-}
-
-func read(db *mongo.Database, file string, initial bool, concurrency int, blockSize int) error {
- nodes := db.Collection("nodes")
- ways := db.Collection("ways")
- relations := db.Collection("relations")
-
- f, err := os.Open(file)
- if err != nil {
- return err
- }
- defer f.Close()
-
- opts := (new(options.BulkWriteOptions)).SetOrdered(false)
- nc := 0
- wc := 0
- rc := 0
-
- scanner := osmpbf.New(context.Background(), f, concurrency)
- defer scanner.Close()
-
- bufferNodes := make([]mongo.WriteModel, 0, blockSize)
- bufferWays := make([]mongo.WriteModel, 0, blockSize)
- bufferRelations := make([]mongo.WriteModel, 0, blockSize)
- for scanner.Scan() {
- o := scanner.Object()
- switch o := o.(type) {
- case *osm.Way:
- nodes := make([]int64, 0, len(o.Nodes))
- for _, v := range o.Nodes {
- nodes = append(nodes, int64(v.ID))
- }
-
- w := Way{
- OsmID: int64(o.ID),
- Tags: convertTags(o.Tags),
- Nodes: nodes,
- Timestamp: o.Timestamp,
- Version: o.Version,
- Visible: o.Visible,
- }
- if initial {
- um := mongo.NewInsertOneModel()
- um.SetDocument(w)
- bufferWays = append(bufferWays, um)
- } else {
- um := mongo.NewUpdateOneModel()
- um.SetUpsert(true)
- um.SetUpdate(w)
- um.SetFilter(bson.M{"osm_id": w.OsmID})
- bufferWays = append(bufferWays, um)
- }
- wc++
- case *osm.Node:
- n := Node{
- OsmID: int64(o.ID),
- Location: Coords{
- Type: "Point",
- Coordinates: []float64{
- o.Lon,
- o.Lat,
- }},
- Tags: convertTags(o.Tags),
- Version: o.Version,
- Timestamp: o.Timestamp,
- Visible: o.Visible,
- }
- if initial {
- um := mongo.NewInsertOneModel()
- um.SetDocument(n)
- bufferNodes = append(bufferNodes, um)
- } else {
- um := mongo.NewUpdateOneModel()
- um.SetUpsert(true)
- um.SetUpdate(n)
- um.SetFilter(bson.M{"osm_id": n.OsmID})
- bufferNodes = append(bufferNodes, um)
- }
- nc++
- case *osm.Relation:
- members := make([]Member, len(o.Members))
- for _, v := range o.Members {
- members = append(members, Member{
- Type: v.Type,
- Version: v.Version,
- Orientation: v.Orientation,
- Ref: v.Ref,
- Role: v.Role,
- Location: Coords{
- Type: "Point",
- Coordinates: []float64{
- v.Lon,
- v.Lat,
- }},
- })
- }
- r := Relation{
- OsmID: int64(o.ID),
- Tags: convertTags(o.Tags),
- Version: o.Version,
- Timestamp: o.Timestamp,
- Visible: o.Visible,
- Members: members,
- }
- if initial {
- um := mongo.NewInsertOneModel()
- um.SetDocument(r)
- bufferRelations = append(bufferRelations, um)
- } else {
- um := mongo.NewUpdateOneModel()
- um.SetUpsert(true)
- um.SetUpdate(r)
- um.SetFilter(bson.M{"osm_id": r.OsmID})
- bufferRelations = append(bufferRelations, um)
- }
- rc++
- }
- if len(bufferNodes) == blockSize {
- if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil {
- return err
- }
- bufferNodes = make([]mongo.WriteModel, 0, blockSize)
- log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
- }
- if len(bufferWays) == blockSize {
- if _, err := nodes.BulkWrite(context.Background(), bufferWays, opts); err != nil {
- return err
- }
- bufferWays = make([]mongo.WriteModel, 0, blockSize)
- log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
- }
- if len(bufferRelations) == blockSize {
- if _, err := nodes.BulkWrite(context.Background(), bufferRelations, opts); err != nil {
- return err
- }
- bufferRelations = make([]mongo.WriteModel, 0, blockSize)
- log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
- }
- }
- if len(bufferNodes) != 0 {
- if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil {
- return err
- }
- log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
- }
- if len(bufferWays) != 0 {
- if _, err := ways.BulkWrite(context.Background(), bufferWays, opts); err != nil {
- return err
- }
- log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
- }
- if len(bufferRelations) != 0 {
- if _, err := relations.BulkWrite(context.Background(), bufferRelations, opts); err != nil {
- return err
- }
- log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
- }
- log.Println("Import done")
- scanErr := scanner.Err()
- if scanErr != nil {
- return scanErr
- }
- return nil
-}
-
-func createIndexes(db *mongo.Database) error {
- opts := options.CreateIndexes().SetMaxTime(1000)
- nodes := db.Collection("nodes")
- log.Println("creating indexes for nodes")
- created, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
- {
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
- },
- {
- Keys: bsonx.Doc{
- {"tags.key", bsonx.Int32(-1)},
- {"tags.value", bsonx.Int32(-1)},
- },
- Options: (options.Index()).SetBackground(true).SetSparse(true),
- },
- }, opts)
- if err != nil {
- return err
- }
- log.Println(created)
- log.Println("creating geoindexes for nodes")
- if err := geoIndex(nodes, "location"); err != nil {
- return err
- }
-
- log.Println("creating indexes for ways")
- ways := db.Collection("ways")
- created, err = ways.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
- {
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
- },
- {
- Keys: bsonx.Doc{
- {"tags.key", bsonx.Int32(-1)},
- {"tags.value", bsonx.Int32(-1)},
- },
- Options: (options.Index()).SetBackground(true).SetSparse(true),
- },
- }, opts)
- if err != nil {
- return err
- }
- log.Println(created)
- relations := db.Collection("relations")
- log.Println("creating geoindexes for relations")
- created, err = relations.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
- {
- Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
- },
- {
- Keys: bsonx.Doc{
- {"tags.key", bsonx.Int32(-1)},
- {"tags.value", bsonx.Int32(-1)},
- },
- Options: (options.Index()).SetBackground(true).SetSparse(true),
- },
- {
- Keys: bsonx.Doc{{"members.ref", bsonx.Int32(-1)}},
- Options: (options.Index()).SetBackground(true).SetSparse(true),
- },
- }, opts)
- if err != nil {
- return err
- }
- log.Println(created)
- if err := geoIndex(relations, "members.coords"); err != nil {
- return err
- }
- log.Println("indexes created")
- return nil
-}
-
-func convertTags(tags osm.Tags) []Tag {
- result := make([]Tag, 0, len(tags))
- for _, t := range tags {
- result = append(result, Tag{
- Key: t.Key,
- Value: t.Value,
+ nodesCh := make(chan Node, 1)
+ waysCh := make(chan Way, 1)
+ relationsCh := make(chan Relation, 1)
+
+ for i := 0; i < *concurrency; i++ {
+ worker := i
+ r.Go(func(ctx context.Context) error {
+ return write(ctx, db, nodesCh, waysCh, relationsCh, *initial, *blockSize, worker)
})
}
- return result
-}
-func simpleIndex(col *mongo.Collection, keys []string, unique bool) error {
- idxKeys := bsonx.Doc{}
- for _, e := range keys {
- idxKeys.Append(e, bsonx.Int32(1))
+ r.Go(func(ctx context.Context) error {
+ return read(ctx, *osmfile, nodesCh, waysCh, relationsCh, *concurrency, layers)
+ })
+ if err := r.Wait(); err != nil {
+ log.Fatal(err)
}
- _, err := col.Indexes().CreateOne(
- context.Background(),
- mongo.IndexModel{
- Keys: idxKeys,
- Options: options.Index().SetUnique(unique).SetSparse(true).SetBackground(true),
- },
- )
- return err
-}
-
-func geoIndex(col *mongo.Collection, key string) error {
- _, err := col.Indexes().CreateOne(
- context.Background(),
- mongo.IndexModel{
- Keys: bsonx.Doc{{
- Key: key, Value: bsonx.Int32(1),
- }},
- Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true),
- },
- )
- return err
}