From 2f4b6e90597784a8a7c01027e0ff5c6b69634a96 Mon Sep 17 00:00:00 2001 From: Alexander NeonXP Kiryukhin Date: Thu, 30 May 2019 14:05:08 +0300 Subject: Parallel write Speedup --- main.go | 306 +++++----------------------------------------------------------- 1 file changed, 21 insertions(+), 285 deletions(-) (limited to 'main.go') diff --git a/main.go b/main.go index df5dcc1..e5dab33 100644 --- a/main.go +++ b/main.go @@ -4,15 +4,13 @@ import ( "context" "flag" "log" - "os" + "strings" "time" - "github.com/paulmach/osm" - "github.com/paulmach/osm/osmpbf" - "go.mongodb.org/mongo-driver/bson" + "github.com/neonxp/rutina" + _ "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/x/bsonx" ) func main() { @@ -20,10 +18,13 @@ func main() { dbname := flag.String("dbname", "map", "Mongo database name") osmfile := flag.String("osmfile", "", "OSM file") initial := flag.Bool("initial", false, "Is initial import") - indexes := flag.Bool("indexes", false, "Just create indexes") - concurrency := flag.Int("concurrency", 16, "Concurrency") + indexes := flag.Bool("indexes", false, "Create indexes") + layersString := flag.String("layers", "nodes,ways,relations", "Layers to import") blockSize := flag.Int("block", 1000, "Block size to bulk write") + concurrency := flag.Int("concurrency", 32, "Concurrency read and write") flag.Parse() + layers := strings.Split(*layersString, ",") + r := rutina.New() ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) client, err := mongo.Connect(ctx, options.Client().ApplyURI(*dbconnection)) if err != nil { @@ -37,289 +38,24 @@ func main() { log.Fatal(err) } log.Println("Indexes created") - return } log.Printf("Started import file %s to db %s", *osmfile, *dbname) - - if err := read(db, *osmfile, *initial, *concurrency, *blockSize); err != nil { - log.Fatal(err) - } -} - -func read(db *mongo.Database, file string, initial bool, concurrency int, blockSize int) error { - nodes := db.Collection("nodes") - ways := db.Collection("ways") - relations := db.Collection("relations") - - f, err := os.Open(file) - if err != nil { - return err - } - defer f.Close() - - opts := (new(options.BulkWriteOptions)).SetOrdered(false) - nc := 0 - wc := 0 - rc := 0 - - scanner := osmpbf.New(context.Background(), f, concurrency) - defer scanner.Close() - - bufferNodes := make([]mongo.WriteModel, 0, blockSize) - bufferWays := make([]mongo.WriteModel, 0, blockSize) - bufferRelations := make([]mongo.WriteModel, 0, blockSize) - for scanner.Scan() { - o := scanner.Object() - switch o := o.(type) { - case *osm.Way: - nodes := make([]int64, 0, len(o.Nodes)) - for _, v := range o.Nodes { - nodes = append(nodes, int64(v.ID)) - } - - w := Way{ - OsmID: int64(o.ID), - Tags: convertTags(o.Tags), - Nodes: nodes, - Timestamp: o.Timestamp, - Version: o.Version, - Visible: o.Visible, - } - if initial { - um := mongo.NewInsertOneModel() - um.SetDocument(w) - bufferWays = append(bufferWays, um) - } else { - um := mongo.NewUpdateOneModel() - um.SetUpsert(true) - um.SetUpdate(w) - um.SetFilter(bson.M{"osm_id": w.OsmID}) - bufferWays = append(bufferWays, um) - } - wc++ - case *osm.Node: - n := Node{ - OsmID: int64(o.ID), - Location: Coords{ - Type: "Point", - Coordinates: []float64{ - o.Lon, - o.Lat, - }}, - Tags: convertTags(o.Tags), - Version: o.Version, - Timestamp: o.Timestamp, - Visible: o.Visible, - } - if initial { - um := mongo.NewInsertOneModel() - um.SetDocument(n) - bufferNodes = append(bufferNodes, um) - } else { - um := mongo.NewUpdateOneModel() - um.SetUpsert(true) - um.SetUpdate(n) - um.SetFilter(bson.M{"osm_id": n.OsmID}) - bufferNodes = append(bufferNodes, um) - } - nc++ - case *osm.Relation: - members := make([]Member, len(o.Members)) - for _, v := range o.Members { - members = append(members, Member{ - Type: v.Type, - Version: v.Version, - Orientation: v.Orientation, - Ref: v.Ref, - Role: v.Role, - Location: Coords{ - Type: "Point", - Coordinates: []float64{ - v.Lon, - v.Lat, - }}, - }) - } - r := Relation{ - OsmID: int64(o.ID), - Tags: convertTags(o.Tags), - Version: o.Version, - Timestamp: o.Timestamp, - Visible: o.Visible, - Members: members, - } - if initial { - um := mongo.NewInsertOneModel() - um.SetDocument(r) - bufferRelations = append(bufferRelations, um) - } else { - um := mongo.NewUpdateOneModel() - um.SetUpsert(true) - um.SetUpdate(r) - um.SetFilter(bson.M{"osm_id": r.OsmID}) - bufferRelations = append(bufferRelations, um) - } - rc++ - } - if len(bufferNodes) == blockSize { - if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil { - return err - } - bufferNodes = make([]mongo.WriteModel, 0, blockSize) - log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) - } - if len(bufferWays) == blockSize { - if _, err := nodes.BulkWrite(context.Background(), bufferWays, opts); err != nil { - return err - } - bufferWays = make([]mongo.WriteModel, 0, blockSize) - log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) - } - if len(bufferRelations) == blockSize { - if _, err := nodes.BulkWrite(context.Background(), bufferRelations, opts); err != nil { - return err - } - bufferRelations = make([]mongo.WriteModel, 0, blockSize) - log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) - } - } - if len(bufferNodes) != 0 { - if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil { - return err - } - log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) - } - if len(bufferWays) != 0 { - if _, err := ways.BulkWrite(context.Background(), bufferWays, opts); err != nil { - return err - } - log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) - } - if len(bufferRelations) != 0 { - if _, err := relations.BulkWrite(context.Background(), bufferRelations, opts); err != nil { - return err - } - log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc) - } - log.Println("Import done") - scanErr := scanner.Err() - if scanErr != nil { - return scanErr - } - return nil -} - -func createIndexes(db *mongo.Database) error { - opts := options.CreateIndexes().SetMaxTime(1000) - nodes := db.Collection("nodes") - log.Println("creating indexes for nodes") - created, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ - { - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), - }, - { - Keys: bsonx.Doc{ - {"tags.key", bsonx.Int32(-1)}, - {"tags.value", bsonx.Int32(-1)}, - }, - Options: (options.Index()).SetBackground(true).SetSparse(true), - }, - }, opts) - if err != nil { - return err - } - log.Println(created) - log.Println("creating geoindexes for nodes") - if err := geoIndex(nodes, "location"); err != nil { - return err - } - - log.Println("creating indexes for ways") - ways := db.Collection("ways") - created, err = ways.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ - { - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), - }, - { - Keys: bsonx.Doc{ - {"tags.key", bsonx.Int32(-1)}, - {"tags.value", bsonx.Int32(-1)}, - }, - Options: (options.Index()).SetBackground(true).SetSparse(true), - }, - }, opts) - if err != nil { - return err - } - log.Println(created) - relations := db.Collection("relations") - log.Println("creating geoindexes for relations") - created, err = relations.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ - { - Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false), - }, - { - Keys: bsonx.Doc{ - {"tags.key", bsonx.Int32(-1)}, - {"tags.value", bsonx.Int32(-1)}, - }, - Options: (options.Index()).SetBackground(true).SetSparse(true), - }, - { - Keys: bsonx.Doc{{"members.ref", bsonx.Int32(-1)}}, - Options: (options.Index()).SetBackground(true).SetSparse(true), - }, - }, opts) - if err != nil { - return err - } - log.Println(created) - if err := geoIndex(relations, "members.coords"); err != nil { - return err - } - log.Println("indexes created") - return nil -} - -func convertTags(tags osm.Tags) []Tag { - result := make([]Tag, 0, len(tags)) - for _, t := range tags { - result = append(result, Tag{ - Key: t.Key, - Value: t.Value, + nodesCh := make(chan Node, 1) + waysCh := make(chan Way, 1) + relationsCh := make(chan Relation, 1) + + for i := 0; i < *concurrency; i++ { + worker := i + r.Go(func(ctx context.Context) error { + return write(ctx, db, nodesCh, waysCh, relationsCh, *initial, *blockSize, worker) }) } - return result -} -func simpleIndex(col *mongo.Collection, keys []string, unique bool) error { - idxKeys := bsonx.Doc{} - for _, e := range keys { - idxKeys.Append(e, bsonx.Int32(1)) + r.Go(func(ctx context.Context) error { + return read(ctx, *osmfile, nodesCh, waysCh, relationsCh, *concurrency, layers) + }) + if err := r.Wait(); err != nil { + log.Fatal(err) } - _, err := col.Indexes().CreateOne( - context.Background(), - mongo.IndexModel{ - Keys: idxKeys, - Options: options.Index().SetUnique(unique).SetSparse(true).SetBackground(true), - }, - ) - return err -} - -func geoIndex(col *mongo.Collection, key string) error { - _, err := col.Indexes().CreateOne( - context.Background(), - mongo.IndexModel{ - Keys: bsonx.Doc{{ - Key: key, Value: bsonx.Int32(1), - }}, - Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true), - }, - ) - return err } -- cgit v1.2.3