From fab4199710ee92f4be993d911150c6d9baaed4a0 Mon Sep 17 00:00:00 2001 From: exlimit Date: Sun, 2 Oct 2022 23:25:17 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=99=90=E9=80=9F=E5=8F=82?= =?UTF-8?q?=E6=95=B0timedelay?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf.toml | 13 +++--- main.go | 3 +- task.go | 124 ++++-------------------------------------------------- 3 files changed, 19 insertions(+), 121 deletions(-) diff --git a/conf.toml b/conf.toml index 78f167d4..ff79e5e3 100644 --- a/conf.toml +++ b/conf.toml @@ -8,9 +8,11 @@ directory ="output" [task] #number of fetchers - workers = 64 + workers = 4 #number of savers - savepipe = 4 + savepipe = 1 + #min request interval, a speed limit, unit millisecond + timedelay = 100 [tm] #name for mbtiles @@ -25,9 +27,10 @@ #the vector tiles metadata tilejson # json = "" #url is the schema url of tiles - # url= "https://api.mapbox.com/v4/mapbox.mapbox-streets-v8/{z}/{x}/{y}.vector.pbf?sku=1016gFniej06a&access_token=pk.eyJ1IjoibWFwYm94IiwiYSI6ImNpejY4M29iazA2Z2gycXA4N2pmbDZmangifQ.-g_vE53SD2WrJ6tFX7QHmA" - # url= "https://api.mapbox.com/v4/mapbox.mapbox-terrain-v2/{z}/{x}/{y}.vector.pbf?sku=1016gFniej06a&access_token=pk.eyJ1IjoibWFwYm94IiwiYSI6ImNpejY4M29iazA2Z2gycXA4N2pmbDZmangifQ.-g_vE53SD2WrJ6tFX7QHmA" - url = "http://mt0.google.com/vt/lyrs=s&x={x}&y={y}&z={z}" + # url = "https://api.mapbox.com/v4/mapbox.mapbox-streets-v8/{z}/{x}/{y}.vector.pbf?sku=1016gFniej06a&access_token=pk.eyJ1IjoibWFwYm94IiwiYSI6ImNpejY4M29iazA2Z2gycXA4N2pmbDZmangifQ.-g_vE53SD2WrJ6tFX7QHmA" + # url = "https://api.mapbox.com/v4/mapbox.mapbox-terrain-v2/{z}/{x}/{y}.vector.pbf?sku=1016gFniej06a&access_token=pk.eyJ1IjoibWFwYm94IiwiYSI6ImNpejY4M29iazA2Z2gycXA4N2pmbDZmangifQ.-g_vE53SD2WrJ6tFX7QHmA" + url = "https://api.maptiler.com/tiles/v3/{z}/{x}/{y}.pbf?key=KDhMfHvorAFkFe64wlZb" + # url = "http://mt0.google.com/vt/lyrs=s&x={x}&y={y}&z={z}" #lrs can set diff boundaries for diff levels [[lrs]] min = 0 diff --git a/main.go b/main.go index d72aa0d4..8eb78483 100644 --- a/main.go +++ b/main.go @@ -61,7 +61,8 @@ func initConf(cfgFile string) { viper.SetDefault("output.format", "mbtiles") viper.SetDefault("output.directory", "output") viper.SetDefault("task.workers", 4) - viper.SetDefault("task.savepipe", 8) + viper.SetDefault("task.savepipe", 1) + viper.SetDefault("task.timedelay", 0) } func main() { diff --git a/task.go b/task.go index e11e30d9..a7b7d46d 100644 --- a/task.go +++ b/task.go @@ -4,7 +4,6 @@ import ( "bytes" "compress/gzip" "database/sql" - "encoding/json" "fmt" "io/ioutil" "net/http" @@ -16,8 +15,6 @@ import ( "time" "github.com/paulmach/orb" - "github.com/paulmach/orb/clip" - "github.com/paulmach/orb/geojson" "github.com/paulmach/orb/maptile" "github.com/paulmach/orb/maptile/tilecover" log "github.com/sirupsen/logrus" @@ -45,8 +42,8 @@ type Task struct { db *sql.DB workerCount int savePipeSize int + timeDelay int bufSize int - layerWG sync.WaitGroup tileWG sync.WaitGroup abort, pause, play chan struct{} workers chan maptile.Tile @@ -85,6 +82,7 @@ func NewTask(layers []Layer, m TileMap) *Task { task.workerCount = viper.GetInt("task.workers") task.savePipeSize = viper.GetInt("task.savepipe") + task.timeDelay = viper.GetInt("task.timedelay") task.workers = make(chan maptile.Tile, task.workerCount) task.savingpipe = make(chan Tile, task.savePipeSize) task.bufSize = viper.GetInt("task.mergebuf") @@ -268,8 +266,8 @@ func (task *Task) tileFetcher(t maptile.Tile, url string) { T: t, C: body, } - if task.TileMap.Format == PBF { + if task.TileMap.Format == PBF { var buf bytes.Buffer zw := gzip.NewWriter(&buf) _, err = zw.Write(body) @@ -289,8 +287,12 @@ func (task *Task) tileFetcher(t maptile.Tile, url string) { // task.wg.Add(1) task.saveTile(tile) } - secs := time.Since(start).Seconds() - fmt.Printf("tile(z:%d, x:%d, y:%d), %.3fs, %.2f kb, %s ...\n", t.Z, t.X, t.Y, secs, float32(len(body))/1024.0, pbf) + cost := time.Since(start).Milliseconds() + if cost < int64(task.timeDelay) { + time.Sleep(time.Duration(int64(task.timeDelay)-cost) * time.Millisecond) + } + cost = time.Since(start).Milliseconds() + fmt.Printf("tile(z:%d, x:%d, y:%d), %dms , %.2f kb, %s ...\n", t.Z, t.X, t.Y, cost, float32(len(body))/1024.0, pbf) } //DownloadZoom 下载指定层级 @@ -331,45 +333,6 @@ func (task *Task) downloadLayer(layer Layer) { bar.FinishPrint(fmt.Sprintf("Task %s Zoom %d finished ~", task.ID, layer.Zoom)) } -//DownloadZoom 下载指定层级 -func (task *Task) downloadGeom(geom orb.Geometry, zoom int) { - count := tilecover.GeometryCount(geom, maptile.Zoom(zoom)) - bar := pb.New64(count).Prefix(fmt.Sprintf("Zoom %d : ", zoom)) - bar.Start() - - var tilelist = make(chan maptile.Tile, task.bufSize) - - go func(g orb.Geometry, z maptile.Zoom, ch chan<- maptile.Tile) { - defer close(ch) - tilecover.GeometryChannel(g, z, ch) - }(geom, maptile.Zoom(zoom), tilelist) - - for tile := range tilelist { - // log.Infof(`fetching tile %v ~`, tile) - select { - case task.workers <- tile: - bar.Increment() - task.Bar.Increment() - task.tileWG.Add(1) - go task.tileFetcher(tile, task.TileMap.URL) - case <-task.abort: - log.Infof("Task %s got canceled.", task.ID) - close(tilelist) - case <-task.pause: - log.Infof("Task %s suspended.", task.ID) - select { - case <-task.play: - log.Infof("task %s go on.", task.ID) - case <-task.abort: - log.Infof("task %s got canceled.", task.ID) - close(tilelist) - } - } - } - task.tileWG.Wait() - bar.FinishPrint(fmt.Sprintf("Task %s Zoom %d finished ~", task.ID, zoom)) -} - //Download 开启下载任务 func (task *Task) Download() { //g orb.Geometry, minz int, maxz int @@ -386,72 +349,3 @@ func (task *Task) Download() { } task.Bar.FinishPrint(fmt.Sprintf("Task %s finished ~", task.ID)) } - -//DownloadDepth 深度优先下载 -func (task *Task) DownloadDepth() { - task.Bar = pb.New64(task.Total).Prefix("Fetching -> ").Postfix("\n") - task.Bar.Start() - // for _, layer := range task.Layers { - // task.downloadLayer(layer) - // break - // } - for i, layer := range task.Layers { - if i < 1 { - continue - } - task.tileSet.Lock() - zoomSet := task.tileSet.M - mfc := task.tileSet.M.ToFeatureCollection() - ifile := len(task.tileSet.M) - fmt.Printf("merging up append set, %d tiles ~\n", ifile) - fmt.Println("downloading started, Zoom:", layer.Zoom, "Tiles:", ifile) - bar := pb.StartNew(ifile).Prefix(fmt.Sprintf("Zoom %d : ", layer.Zoom)).Postfix("\n") - var wg sync.WaitGroup - wg.Add(1) - go func(name string, mfc *geojson.FeatureCollection) { - defer wg.Done() - data, err := json.MarshalIndent(mfc, "", " ") - if err != nil { - log.Printf("error marshalling json: %v", err) - return - } - - err = ioutil.WriteFile(name+".geojson", data, 0644) - if err != nil { - log.Printf("write file failure: %v", err) - } - log.Printf("output finished : %s.geojson", name) - - }(strconv.FormatInt(int64(ifile), 10), mfc) - - task.tileSet.M = make(maptile.Set) - task.tileSet.Unlock() - cliperBuffer := make(chan orb.Geometry, 16) - go func(set maptile.Set, buffer chan<- orb.Geometry, bar *pb.ProgressBar) { - defer close(buffer) - for t := range set { - bar.Increment() - // buffer <- t.Bound() - log.Println("starting cliper...") - start := time.Now() - for _, g := range layer.Collection { - clipped := clip.Geometry(t.Bound(), g) - secs := time.Since(start).Seconds() - if clipped != nil { - buffer <- clipped - log.Printf("cliper add to buffer,time:%.4fs...", secs) - } - } - } - log.Printf("cliper buffer closing...") - close(buffer) - }(zoomSet, cliperBuffer, bar) - - for geom := range cliperBuffer { - task.downloadGeom(geom, layer.Zoom) - } - wg.Wait() //wait for saving - bar.FinishPrint(fmt.Sprintf("zoom %d finished ~", layer.Zoom)) - } - task.Bar.FinishPrint(fmt.Sprintf("Task %s finished ~", task.ID)) -}