aboutsummaryrefslogblamecommitdiff
path: root/main.go
blob: 9ec7157386a4908311e649b3a53c5856ea20d4b6 (plain) (tree)
1
2
3
4
5




                 













                                                                                                       
                                                                     
                                                         


                                                                        







                                                                                   






                                                                                      




                              
                                                                                                
                                       


                                                    

                                     

                                                   

                                               



                                                              






                               
                                                                 



               
                                                                   

                             
                                                        







                                                                  
 







                                                               









                                                                       















                                                               









                                                                       


























                                                                       









                                                                       


                            






                                                                                                      
         






                                                                                              













                                                    

























                                                                                                         
package main

import (
	"context"
	"flag"
	"log"
	"os"
	"time"

	"github.com/paulmach/osm"
	"github.com/paulmach/osm/osmpbf"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
	"go.mongodb.org/mongo-driver/x/bsonx"
)

func main() {
	dbconnection := flag.String("dbconnection", "mongodb://localhost:27017", "Mongo database name")
	dbname := flag.String("dbname", "map", "Mongo database name")
	osmfile := flag.String("osmfile", "", "OSM file")
	initial := flag.Bool("initial", false, "Is initial import")
	concurrency := flag.Int("concurrency", 16, "Concurrency")
	blockSize := flag.Int("block", 1000, "Block size to bulk write")
	flag.Parse()
	ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
	client, err := mongo.Connect(ctx, options.Client().ApplyURI(*dbconnection))
	if err != nil {
		log.Fatal(err)
	}
	defer client.Disconnect(context.Background())
	db := client.Database(*dbname)
	log.Printf("Started import file %s to db %s", *osmfile, *dbname)
	if *initial {
		log.Println("Initial import")
	} else {
		log.Println("Diff import")
	}
	if err := read(db, *osmfile, *initial, *concurrency, *blockSize); err != nil {
		log.Fatal(err)
	}

}

func read(db *mongo.Database, file string, initial bool, concurrency int, blockSize int) error {
	nodes := db.Collection("nodes")
	simpleIndex(nodes, []string{"osm_id"}, true)
	simpleIndex(nodes, []string{"tags"}, false)
	geoIndex(nodes, "location")

	ways := db.Collection("ways")
	simpleIndex(ways, []string{"osm_id"}, true)
	simpleIndex(ways, []string{"tags"}, false)

	relations := db.Collection("relations")
	simpleIndex(relations, []string{"osm_id"}, true)
	simpleIndex(relations, []string{"tags"}, false)
	simpleIndex(relations, []string{"members.ref"}, false)
	geoIndex(relations, "members.coords")

	f, err := os.Open(file)
	if err != nil {
		return err
	}
	defer f.Close()

	opts := (new(options.BulkWriteOptions)).SetOrdered(false)
	nc := 0
	wc := 0
	rc := 0

	scanner := osmpbf.New(context.Background(), f, concurrency)
	defer scanner.Close()

	buffer := make([]mongo.WriteModel, 0, blockSize)
	for scanner.Scan() {
		o := scanner.Object()
		switch o := o.(type) {
		case *osm.Way:
			nodes := make([]int64, 0, len(o.Nodes))
			for _, v := range o.Nodes {
				nodes = append(nodes, int64(v.ID))
			}

			w := Way{
				OsmID:     int64(o.ID),
				Tags:      convertTags(o.Tags),
				Nodes:     nodes,
				Timestamp: o.Timestamp,
				Version:   o.Version,
				Visible:   o.Visible,
			}
			if initial {
				um := mongo.NewInsertOneModel()
				um.SetDocument(w)
				buffer = append(buffer, um)
			} else {
				um := mongo.NewUpdateOneModel()
				um.SetUpsert(true)
				um.SetUpdate(w)
				um.SetFilter(bson.M{"osm_id": w.OsmID})
				buffer = append(buffer, um)
			}
			wc++
		case *osm.Node:
			n := Node{
				OsmID: int64(o.ID),
				Location: Coords{
					Type: "Point",
					Coordinates: []float64{
						o.Lon,
						o.Lat,
					}},
				Tags:      convertTags(o.Tags),
				Version:   o.Version,
				Timestamp: o.Timestamp,
				Visible:   o.Visible,
			}
			if initial {
				um := mongo.NewInsertOneModel()
				um.SetDocument(n)
				buffer = append(buffer, um)
			} else {
				um := mongo.NewUpdateOneModel()
				um.SetUpsert(true)
				um.SetUpdate(n)
				um.SetFilter(bson.M{"osm_id": n.OsmID})
				buffer = append(buffer, um)
			}
			nc++
		case *osm.Relation:
			members := make([]Member, len(o.Members))
			for _, v := range o.Members {
				members = append(members, Member{
					Type:        v.Type,
					Version:     v.Version,
					Orientation: v.Orientation,
					Ref:         v.Ref,
					Role:        v.Role,
					Location: Coords{
						Type: "Point",
						Coordinates: []float64{
							v.Lon,
							v.Lat,
						}},
				})
			}
			r := Relation{
				OsmID:     int64(o.ID),
				Tags:      convertTags(o.Tags),
				Version:   o.Version,
				Timestamp: o.Timestamp,
				Visible:   o.Visible,
				Members:   members,
			}
			if initial {
				um := mongo.NewInsertOneModel()
				um.SetDocument(r)
				buffer = append(buffer, um)
			} else {
				um := mongo.NewUpdateOneModel()
				um.SetUpsert(true)
				um.SetUpdate(r)
				um.SetFilter(bson.M{"osm_id": r.OsmID})
				buffer = append(buffer, um)
			}
			rc++
		}
		if len(buffer) == blockSize {
			if _, err := nodes.BulkWrite(context.Background(), buffer, opts); err != nil {
				return err
			}
			buffer = make([]mongo.WriteModel, 0, blockSize)
			log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
		}
	}
	if len(buffer) != 0 {
		if _, err := nodes.BulkWrite(context.Background(), buffer, opts); err != nil {
			return err
		}
		log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
	}
	log.Println("Import done")
	scanErr := scanner.Err()
	if scanErr != nil {
		return scanErr
	}
	return nil
}

func convertTags(tags osm.Tags) map[string]string {
	result := make(map[string]string, len(tags))
	for _, t := range tags {
		result[t.Key] = t.Value
	}
	return result
}

func simpleIndex(col *mongo.Collection, keys []string, unique bool) {
	idxKeys := bsonx.Doc{}
	for _, e := range keys {
		idxKeys.Append(e, bsonx.Int32(1))
	}
	_, _ = col.Indexes().CreateOne(
		context.Background(),
		mongo.IndexModel{
			Keys:    idxKeys,
			Options: options.Index().SetUnique(unique).SetSparse(true).SetBackground(true),
		},
	)
}

func geoIndex(col *mongo.Collection, key string) {
	_, _ = col.Indexes().CreateOne(
		context.Background(),
		mongo.IndexModel{
			Keys: bsonx.Doc{{
				Key: key, Value: bsonx.Int32(1),
			}},
			Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true),
		},
	)
}