From 2f4b6e90597784a8a7c01027e0ff5c6b69634a96 Mon Sep 17 00:00:00 2001 From: Alexander NeonXP Kiryukhin Date: Thu, 30 May 2019 14:05:08 +0300 Subject: Parallel write Speedup --- README.md | 20 ++-- go.mod | 2 + go.sum | 4 + helpers.go | 14 +++ indexes.go | 96 +++++++++++++++++++ main.go | 306 +++++-------------------------------------------------------- models.go | 60 ++++++------ reader.go | 114 +++++++++++++++++++++++ writer.go | 101 ++++++++++++++++++++ 9 files changed, 394 insertions(+), 323 deletions(-) create mode 100644 helpers.go create mode 100644 indexes.go create mode 100644 reader.go create mode 100644 writer.go diff --git a/README.md b/README.md index df0d88d..72eb5c8 100644 --- a/README.md +++ b/README.md @@ -8,14 +8,18 @@ Simple loader from osm dump file to mongodb. Based on https://github.com/paulmac ## Usage -`./osm2go -osmfile PATH_TO_OSM_FILE [-dbconnection mongodb://localhost:27017] [-dbname osm] [-initial=true] [-concurrency=16] [-block=1000]` - -* `osmfile` required, path to *.osm or *.osm.pbf file -* `dbconnection` optional, mongodb connection string (default: `mongodb://localhost:27017`) -* `dbname` optional, mongodb database name (default: `osm`) -* `initial` optional, use insert instead upsert. Faster, but not check if item exists (default: `false`) -* `concurrency` optional, parallel read processes (default, `16`) -* `block` optional, block size to bulk write (default: `1000`) +`./osm2go -osmfile=PATH_TO_OSM_FILE` + +All flags: + +* `-osmfile` (required) OSM file +* `-initial` (default:false) Is initial import (uses insert, not upsert) +* `-indexes` (default:false) Create indexes (needs only first time) +* `-dbconnection` (default:"mongodb://localhost:27017") Mongo database name +* `-dbname` (default:"map") Mongo database name +* `-layers` (default:"nodes,ways,relations") Layers to import +* `-block` (default:1000) Block size to bulk write +* `-concurrency` (default:32) Concurrency read and write ## Example diff --git a/go.mod b/go.mod index 06c3118..96aaa12 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,8 @@ require ( github.com/gogo/protobuf v1.2.1 // indirect github.com/golang/snappy v0.0.1 // indirect github.com/google/go-cmp v0.3.0 // indirect + github.com/neonxp/rutina v0.4.3 + github.com/paulmach/go.geojson v1.4.0 github.com/paulmach/orb v0.1.3 github.com/paulmach/osm v0.0.1 github.com/stretchr/testify v1.3.0 // indirect diff --git a/go.sum b/go.sum index 419da2d..744a494 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,10 @@ github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/neonxp/rutina v0.4.3 h1:It7wu2L1FlPMC7UFGS7cTdMPWqd2hwL6+xP8UP72xZc= +github.com/neonxp/rutina v0.4.3/go.mod h1:QJOHIcMI4Lh4Nyyi0v119KZllW1S5KxJyy/zg5KQXno= +github.com/paulmach/go.geojson v1.4.0 h1:5x5moCkCtDo5x8af62P9IOAYGQcYHtxz2QJ3x1DoCgY= +github.com/paulmach/go.geojson v1.4.0/go.mod h1:YaKx1hKpWF+T2oj2lFJPsW/t1Q5e1jQI61eoQSTwpIs= github.com/paulmach/orb v0.1.3 h1:Wa1nzU269Zv7V9paVEY1COWW8FCqv4PC/KJRbJSimpM= github.com/paulmach/orb v0.1.3/go.mod h1:VFlX/8C+IQ1p6FTRRKzKoOPJnvEtA5G0Veuqwbu//Vk= github.com/paulmach/osm v0.0.1 h1:TxU/uZnJ+ssntblY6kSieOP4tQv31VW8wfyf4L3BeKs= diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..b8672ac --- /dev/null +++ b/helpers.go @@ -0,0 +1,14 @@ +package main + +import "github.com/paulmach/osm" + +func convertTags(tags osm.Tags) []Tag { + result := make([]Tag, 0, len(tags)) + for _, t := range tags { + result = append(result, Tag{ + Key: t.Key, + Value: t.Value, + }) + } + return result +} diff --git a/indexes.go b/indexes.go new file mode 100644 index 0000000..d8dbb32 --- /dev/null +++ b/indexes.go @@ -0,0 +1,96 @@ +package main + +import ( + "context" + "log" + + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/x/bsonx" +) + +func createIndexes(db *mongo.Database) error { + opts := options.CreateIndexes().SetMaxTime(1000) + nodes := db.Collection("nodes") + log.Println("creating indexes for nodes") + created, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}}, + Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), + }, { + Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}}, + Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), + }, { + Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}}, + Options: (options.Index()).SetBackground(true).SetSparse(true), + }, + }, opts) + if err != nil { + return err + } + log.Println(created) + log.Println("creating geoindexes for nodes") + if err := geoIndex(nodes, "location"); err != nil { + return err + } + + log.Println("creating indexes for ways") + ways := db.Collection("ways") + created, err = ways.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}}, + Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), + }, { + Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}}, + Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), + }, { + Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}}, + Options: (options.Index()).SetBackground(true).SetSparse(true), + }, + }, opts) + if err != nil { + return err + } + log.Println(created) + + relations := db.Collection("relations") + log.Println("creating geoindexes for relations") + created, err = relations.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}}, + Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), + }, { + Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}}, + Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), + }, { + Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}}, + Options: (options.Index()).SetBackground(true).SetSparse(true), + }, + { + Keys: bsonx.Doc{{"members.ref", bsonx.Int32(-1)}}, + Options: (options.Index()).SetBackground(true).SetSparse(true), + }, + }, opts) + if err != nil { + return err + } + log.Println(created) + if err := geoIndex(relations, "members.coords"); err != nil { + return err + } + log.Println("indexes created") + return nil +} + +func geoIndex(col *mongo.Collection, key string) error { + _, err := col.Indexes().CreateOne( + context.Background(), + mongo.IndexModel{ + Keys: bsonx.Doc{{ + Key: key, Value: bsonx.String("2dsphere"), + }}, + Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true), + }, + ) + return err +} diff --git a/main.go b/main.go index df5dcc1..e5dab33 100644 --- a/main.go +++ b/main.go @@ -4,15 +4,13 @@ import ( "context" "flag" "log" - "os" + "strings" "time" - "github.com/paulmach/osm" - "github.com/paulmach/osm/osmpbf" - "go.mongodb.org/mongo-driver/bson" + "github.com/neonxp/rutina" + _ "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/x/bsonx" ) func main() { @@ -20,10 +18,13 @@ func main() { dbname := flag.String("dbname", "map", "Mongo database name") osmfile := flag.String("osmfile", "", "OSM file") initial := flag.Bool("initial", false, "Is initial import") - indexes := flag.Bool("indexes", false, "Just create indexes") - concurrency := flag.Int("concurrency", 16, "Concurrency") + indexes := flag.Bool("indexes", false, "Create indexes") + layersString := flag.String("layers", "nodes,ways,relations", "Layers to import") blockSize := flag.Int("block", 1000, "Block size to bulk write") + concurrency := flag.Int("concurrency", 32, "Concurrency read and write") flag.Parse() + layers := strings.Split(*layersString, ",") + r := rutina.New() ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) client, err := mongo.Connect(ctx, options.Client().ApplyURI(*dbconnection)) if err != nil { @@ -37,289 +38,24 @@ func main() { log.Fatal(err) } log.Println("Indexes created") - return } log.Printf("Started import file %s to db %s", *osmfile, *dbname) - - if err := read(db, *osmfile, *initial, *concurrency, *blockSize); err != nil { - log.Fatal(err) - } -} - -func read(db *mongo.Database, file string, initial bool, concurrency int, blockSize int) error { - nodes := db.Collection("nodes") - ways := db.Collection("ways") - relations := db.Collection("relations") - - f, err := os.Open(file) - if err != nil { - return err - } - defer f.Close() - - opts := (new(options.BulkWriteOptions)).SetOrdered(false) - nc := 0 - wc := 0 - rc := 0 - - scanner := osmpbf.New(context.Background(), f, concurrency) - defer scanner.Close() - - bufferNodes := make([]mongo.WriteModel, 0, blockSize) - bufferWays := make([]mongo.WriteModel, 0, blockSize) - bufferRelations := make([]mongo.WriteModel, 0, blockSize) - for scanner.Scan() { - o := scanner.Object() - switch o := o.(type) { - case *osm.Way: - nodes := make([]int64, 0, len(o.Nodes)) - for _, v := range o.Nodes { - nodes = append(nodes, int64(v.ID)) - } - - w := Way{ - OsmID: int64(o.ID), - Tags: convertTags(o.Tags), - Nodes: nodes, - Timestamp: o.Timestamp, - Version: o.Version, - Visible: o.Visible, - } - if initial { - um := mongo.NewInsertOneModel() - um.SetDocument(w) - bufferWays = append(bufferWays, um) - } else { - um := mongo.NewUpdateOneModel() - um.SetUpsert(true) - um.SetUpdate(w) - um.SetFilter(bson.M{"osm_id": w.OsmID}) - bufferWays = append(bufferWays, um) - } - wc++ - case *osm.Node: - n := Node{ - OsmID: int64(o.ID), - Location: Coords{ - Type: "Point", - Coordinates: []float64{ - o.Lon, - o.Lat, - }}, - Tags: convertTags(o.Tags), - Version: o.Version, - Timestamp: o.Timestamp, - Visible: o.Visible, - } - if initial { - um := mongo.NewInsertOneModel() - um.SetDocument(n) - bufferNodes = append(bufferNodes, um) - } else { - um := mongo.NewUpdateOneModel() - um.SetUpsert(true) - um.SetUpdate(n) - um.SetFilter(bson.M{"osm_id": n.OsmID}) - bufferNodes = append(bufferNodes, um) - } - nc++ - case *osm.Relation: - members := make([]Member, len(o.Members)) - for _, v := range o.Members { - members = append(members, Member{ - Type: v.Type, - Version: v.Version, - Orientation: v.Orientation, - Ref: v.Ref, - Role: v.Role, - Location: Coords{ - Type: "Point", - Coordinates: []float64{ - v.Lon, - v.Lat, - }}, - }) - } - r := Relation{ - OsmID: int64(o.ID), - Tags: convertTags(o.Tags), - Version: o.Version, - Timestamp: o.Timestamp, - Visible: o.Visible, - Members: members, - } - if initial { - um := mongo.NewInsertOneModel() - um.SetDocument(r) - bufferRelations = append(bufferRelations, um) - } else { - um := mongo.NewUpdateOneModel() - um.SetUpsert(true) - um.SetUpdate(r) - um.SetFilter(bson.M{"osm_id": r.OsmID}) - bufferRelations = append(bufferRelations, um) - } - rc++ - } - if len(bufferNodes) == blockSize { - if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil { - return err - } - bufferNodes = make([]mongo.WriteModel, 0, blockSize) - log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) - } - if len(bufferWays) == blockSize { - if _, err := nodes.BulkWrite(context.Background(), bufferWays, opts); err != nil { - return err - } - bufferWays = make([]mongo.WriteModel, 0, blockSize) - log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) - } - if len(bufferRelations) == blockSize { - if _, err := nodes.BulkWrite(context.Background(), bufferRelations, opts); err != nil { - return err - } - bufferRelations = make([]mongo.WriteModel, 0, blockSize) - log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) - } - } - if len(bufferNodes) != 0 { - if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil { - return err - } - log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) - } - if len(bufferWays) != 0 { - if _, err := ways.BulkWrite(context.Background(), bufferWays, opts); err != nil { - return err - } - log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) - } - if len(bufferRelations) != 0 { - if _, err := relations.BulkWrite(context.Background(), bufferRelations, opts); err != nil { - return err - } - log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) - } - log.Println("Import done") - scanErr := scanner.Err() - if scanErr != nil { - return scanErr - } - return nil -} - -func createIndexes(db *mongo.Database) error { - opts := options.CreateIndexes().SetMaxTime(1000) - nodes := db.Collection("nodes") - log.Println("creating indexes for nodes") - created, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ - { - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), - }, - { - Keys: bsonx.Doc{ - {"tags.key", bsonx.Int32(-1)}, - {"tags.value", bsonx.Int32(-1)}, - }, - Options: (options.Index()).SetBackground(true).SetSparse(true), - }, - }, opts) - if err != nil { - return err - } - log.Println(created) - log.Println("creating geoindexes for nodes") - if err := geoIndex(nodes, "location"); err != nil { - return err - } - - log.Println("creating indexes for ways") - ways := db.Collection("ways") - created, err = ways.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ - { - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), - }, - { - Keys: bsonx.Doc{ - {"tags.key", bsonx.Int32(-1)}, - {"tags.value", bsonx.Int32(-1)}, - }, - Options: (options.Index()).SetBackground(true).SetSparse(true), - }, - }, opts) - if err != nil { - return err - } - log.Println(created) - relations := db.Collection("relations") - log.Println("creating geoindexes for relations") - created, err = relations.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ - { - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), - }, - { - Keys: bsonx.Doc{ - {"tags.key", bsonx.Int32(-1)}, - {"tags.value", bsonx.Int32(-1)}, - }, - Options: (options.Index()).SetBackground(true).SetSparse(true), - }, - { - Keys: bsonx.Doc{{"members.ref", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true), - }, - }, opts) - if err != nil { - return err - } - log.Println(created) - if err := geoIndex(relations, "members.coords"); err != nil { - return err - } - log.Println("indexes created") - return nil -} - -func convertTags(tags osm.Tags) []Tag { - result := make([]Tag, 0, len(tags)) - for _, t := range tags { - result = append(result, Tag{ - Key: t.Key, - Value: t.Value, + nodesCh := make(chan Node, 1) + waysCh := make(chan Way, 1) + relationsCh := make(chan Relation, 1) + + for i := 0; i < *concurrency; i++ { + worker := i + r.Go(func(ctx context.Context) error { + return write(ctx, db, nodesCh, waysCh, relationsCh, *initial, *blockSize, worker) }) } - return result -} -func simpleIndex(col *mongo.Collection, keys []string, unique bool) error { - idxKeys := bsonx.Doc{} - for _, e := range keys { - idxKeys.Append(e, bsonx.Int32(1)) + r.Go(func(ctx context.Context) error { + return read(ctx, *osmfile, nodesCh, waysCh, relationsCh, *concurrency, layers) + }) + if err := r.Wait(); err != nil { + log.Fatal(err) } - _, err := col.Indexes().CreateOne( - context.Background(), - mongo.IndexModel{ - Keys: idxKeys, - Options: options.Index().SetUnique(unique).SetSparse(true).SetBackground(true), - }, - ) - return err -} - -func geoIndex(col *mongo.Collection, key string) error { - _, err := col.Indexes().CreateOne( - context.Background(), - mongo.IndexModel{ - Keys: bsonx.Doc{{ - Key: key, Value: bsonx.Int32(1), - }}, - Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true), - }, - ) - return err } diff --git a/models.go b/models.go index c98ed2f..dd187d2 100644 --- a/models.go +++ b/models.go @@ -9,54 +9,54 @@ import ( ) type Coords struct { - Type string `bson:"type"` - Coordinates []float64 `bson:"coordinates"` + Type string `json:"type" bson:"type"` + Coordinates []float64 `json:"coordinates" bson:"coordinates"` } type Node struct { - ID primitive.ObjectID `bson:"_id,omitempty"` - OsmID int64 `bson:"osm_id"` - Visible bool `bson:"visible"` - Version int `bson:"version,omitempty"` - Timestamp time.Time `bson:"timestamp"` - Tags []Tag `bson:"tags,omitempty"` - Location Coords `bson:"location"` + ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"` + OsmID int64 `json:"osm_id" bson:"osm_id"` + Visible bool `json:"visible" bson:"visible"` + Version int `json:"version,omitempty" bson:"version,omitempty"` + Timestamp time.Time `json:"timestamp" bson:"timestamp"` + Tags []Tag `json:"tags,omitempty" bson:"tags,omitempty"` + Location Coords `json:"location" bson:"location"` } type Way struct { - ID primitive.ObjectID `bson:"_id,omitempty"` - OsmID int64 `bson:"osm_id"` - Visible bool `bson:"visible"` - Version int `bson:"version"` - Timestamp time.Time `bson:"timestamp"` - Nodes []int64 `bson:"nodes"` - Tags []Tag `bson:"tags"` + ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"` + OsmID int64 `json:"osm_id" bson:"osm_id"` + Visible bool `json:"visible" bson:"visible"` + Version int `json:"version" bson:"version"` + Timestamp time.Time `json:"timestamp" bson:"timestamp"` + Nodes []int64 `json:"nodes" bson:"nodes"` + Tags []Tag `json:"tags" bson:"tags"` } type Relation struct { - ID primitive.ObjectID `bson:"_id,omitempty"` - OsmID int64 `bson:"osm_id"` - Visible bool `bson:"visible"` - Version int `bson:"version"` - Timestamp time.Time `bson:"timestamp"` - Members []Member `bson:"members"` - Tags []Tag `bson:"tags"` + ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"` + OsmID int64 `json:"osm_id" bson:"osm_id"` + Visible bool `json:"visible" bson:"visible"` + Version int `json:"version" bson:"version"` + Timestamp time.Time `json:"timestamp" bson:"timestamp"` + Members []Member `json:"members" bson:"members"` + Tags []Tag `json:"tags" bson:"tags"` } type Member struct { - Type osm.Type `bson:"type"` - Ref int64 `bson:"ref"` - Role string `bson:"role"` + Type osm.Type `json:"type" bson:"type"` + Ref int64 `json:"ref" bson:"ref"` + Role string `json:"role" bson:"role"` Version int - Location Coords `bson:"location"` + Location *Coords `json:"location,omitempty" bson:"location,omitempty"` // Orientation is the direction of the way around a ring of a multipolygon. // Only valid for multipolygon or boundary relations. - Orientation orb.Orientation `bson:"orienation,omitempty"` + Orientation orb.Orientation `json:"orienation,omitempty" bson:"orienation,omitempty"` } type Tag struct { - Key string `bson:"key"` - Value string `bson:"value"` + Key string `json:"key" bson:"key"` + Value string `json:"value" bson:"value"` } diff --git a/reader.go b/reader.go new file mode 100644 index 0000000..5ab603c --- /dev/null +++ b/reader.go @@ -0,0 +1,114 @@ +package main + +import ( + "context" + "log" + "os" + + "github.com/paulmach/osm" + "github.com/paulmach/osm/osmpbf" +) + +func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, concurrency int, layers []string) error { + f, err := os.Open(file) + if err != nil { + return err + } + defer f.Close() + scanner := osmpbf.New(context.Background(), f, concurrency) + defer scanner.Close() + + layersToImport := map[string]bool{ + "ways": false, + "nodes": false, + "relations": false, + } + + for _, l := range layers { + layersToImport[l] = true + } + + for scanner.Scan() { + if ctx.Err() != nil { + return ctx.Err() + } + o := scanner.Object() + switch o := o.(type) { + case *osm.Way: + if !layersToImport["ways"] { + continue + } + nodes := make([]int64, 0, len(o.Nodes)) + for _, v := range o.Nodes { + nodes = append(nodes, int64(v.ID)) + } + + w := Way{ + OsmID: int64(o.ID), + Tags: convertTags(o.Tags), + Nodes: nodes, + Timestamp: o.Timestamp, + Version: o.Version, + Visible: o.Visible, + } + waysCh <- w + case *osm.Node: + if !layersToImport["nodes"] { + continue + } + n := Node{ + OsmID: int64(o.ID), + Location: Coords{ + Type: "Point", + Coordinates: []float64{ + o.Lon, + o.Lat, + }}, + Tags: convertTags(o.Tags), + Version: o.Version, + Timestamp: o.Timestamp, + Visible: o.Visible, + } + nodesCh <- n + case *osm.Relation: + if !layersToImport["relations"] { + continue + } + members := make([]Member, 0, len(o.Members)) + for _, v := range o.Members { + var location *Coords + if v.Lat != 0.0 && v.Lon != 0.0 { + location = &Coords{ + Type: "Point", + Coordinates: []float64{ + v.Lon, + v.Lat, + }} + } + members = append(members, Member{ + Type: v.Type, + Version: v.Version, + Orientation: v.Orientation, + Ref: v.Ref, + Role: v.Role, + Location: location, + }) + } + r := Relation{ + OsmID: int64(o.ID), + Tags: convertTags(o.Tags), + Version: o.Version, + Timestamp: o.Timestamp, + Visible: o.Visible, + Members: members, + } + relationsCh <- r + } + } + log.Println("Read done") + scanErr := scanner.Err() + if scanErr != nil { + return scanErr + } + return nil +} diff --git a/writer.go b/writer.go new file mode 100644 index 0000000..fe82656 --- /dev/null +++ b/writer.go @@ -0,0 +1,101 @@ +package main + +import ( + "context" + "log" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func write(ctx context.Context, db *mongo.Database, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, initial bool, blockSize int, worker int) error { + nodes := db.Collection("nodes") + ways := db.Collection("ways") + relations := db.Collection("relations") + opts := (new(options.BulkWriteOptions)).SetOrdered(false) + nodesBuffer := make([]mongo.WriteModel, 0, blockSize) + waysBuffer := make([]mongo.WriteModel, 0, blockSize) + relationsBuffer := make([]mongo.WriteModel, 0, blockSize) + nc := 0 + wc := 0 + rc := 0 + for { + select { + case w := <-waysCh: + if initial { + um := mongo.NewInsertOneModel() + um.SetDocument(w) + waysBuffer = append(waysBuffer, um) + } else { + um := mongo.NewUpdateOneModel() + um.SetUpsert(true) + um.SetUpdate(w) + um.SetFilter(bson.M{"osm_id": w.OsmID}) + waysBuffer = append(waysBuffer, um) + } + + case n := <-nodesCh: + if initial { + um := mongo.NewInsertOneModel() + um.SetDocument(n) + nodesBuffer = append(nodesBuffer, um) + } else { + um := mongo.NewUpdateOneModel() + um.SetUpsert(true) + um.SetUpdate(n) + um.SetFilter(bson.M{"osm_id": n.OsmID}) + nodesBuffer = append(nodesBuffer, um) + } + case r := <-relationsCh: + if initial { + um := mongo.NewInsertOneModel() + um.SetDocument(r) + relationsBuffer = append(relationsBuffer, um) + } else { + um := mongo.NewUpdateOneModel() + um.SetUpsert(true) + um.SetUpdate(r) + um.SetFilter(bson.M{"osm_id": r.OsmID}) + relationsBuffer = append(relationsBuffer, um) + } + case <-ctx.Done(): + log.Printf("[%d] saving last info in buffers...", worker) + if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil { + return err + } + if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil { + return err + } + if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil { + return err + } + log.Printf("[%d] Done", worker) + return nil + } + if len(nodesBuffer) == blockSize { + nc++ + log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc) + if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil { + return err + } + nodesBuffer = make([]mongo.WriteModel, 0) + } + if len(waysBuffer) == blockSize { + wc++ + log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc) + if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil { + return err + } + waysBuffer = make([]mongo.WriteModel, 0) + } + if len(relationsBuffer) == blockSize { + rc++ + log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc) + if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil { + return err + } + relationsBuffer = make([]mongo.WriteModel, 0) + } + } +} -- cgit v1.2.3