Skip to content

Commit

Permalink
添加限速参数timedelay
Browse files Browse the repository at this point in the history
  • Loading branch information
exlimit committed Oct 2, 2022
1 parent 0295045 commit fab4199
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 121 deletions.
13 changes: 8 additions & 5 deletions conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
directory ="output"
[task]
#number of fetchers
workers = 64
workers = 4
#number of savers
savepipe = 4
savepipe = 1
#min request interval, a speed limit, unit millisecond
timedelay = 100

[tm]
#name for mbtiles
Expand All @@ -25,9 +27,10 @@
#the vector tiles metadata tilejson
# json = ""
#url is the schema url of tiles
# url= "https://api.mapbox.com/v4/mapbox.mapbox-streets-v8/{z}/{x}/{y}.vector.pbf?sku=1016gFniej06a&access_token=pk.eyJ1IjoibWFwYm94IiwiYSI6ImNpejY4M29iazA2Z2gycXA4N2pmbDZmangifQ.-g_vE53SD2WrJ6tFX7QHmA"
# url= "https://api.mapbox.com/v4/mapbox.mapbox-terrain-v2/{z}/{x}/{y}.vector.pbf?sku=1016gFniej06a&access_token=pk.eyJ1IjoibWFwYm94IiwiYSI6ImNpejY4M29iazA2Z2gycXA4N2pmbDZmangifQ.-g_vE53SD2WrJ6tFX7QHmA"
url = "http://mt0.google.com/vt/lyrs=s&x={x}&y={y}&z={z}"
# url = "https://api.mapbox.com/v4/mapbox.mapbox-streets-v8/{z}/{x}/{y}.vector.pbf?sku=1016gFniej06a&access_token=pk.eyJ1IjoibWFwYm94IiwiYSI6ImNpejY4M29iazA2Z2gycXA4N2pmbDZmangifQ.-g_vE53SD2WrJ6tFX7QHmA"
# url = "https://api.mapbox.com/v4/mapbox.mapbox-terrain-v2/{z}/{x}/{y}.vector.pbf?sku=1016gFniej06a&access_token=pk.eyJ1IjoibWFwYm94IiwiYSI6ImNpejY4M29iazA2Z2gycXA4N2pmbDZmangifQ.-g_vE53SD2WrJ6tFX7QHmA"
url = "https://api.maptiler.com/tiles/v3/{z}/{x}/{y}.pbf?key=KDhMfHvorAFkFe64wlZb"
# url = "http://mt0.google.com/vt/lyrs=s&x={x}&y={y}&z={z}"
#lrs can set diff boundaries for diff levels
[[lrs]]
min = 0
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ func initConf(cfgFile string) {
viper.SetDefault("output.format", "mbtiles")
viper.SetDefault("output.directory", "output")
viper.SetDefault("task.workers", 4)
viper.SetDefault("task.savepipe", 8)
viper.SetDefault("task.savepipe", 1)
viper.SetDefault("task.timedelay", 0)
}

func main() {
Expand Down
124 changes: 9 additions & 115 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"compress/gzip"
"database/sql"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -16,8 +15,6 @@ import (
"time"

"github.com/paulmach/orb"
"github.com/paulmach/orb/clip"
"github.com/paulmach/orb/geojson"
"github.com/paulmach/orb/maptile"
"github.com/paulmach/orb/maptile/tilecover"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -45,8 +42,8 @@ type Task struct {
db *sql.DB
workerCount int
savePipeSize int
timeDelay int
bufSize int
layerWG sync.WaitGroup
tileWG sync.WaitGroup
abort, pause, play chan struct{}
workers chan maptile.Tile
Expand Down Expand Up @@ -85,6 +82,7 @@ func NewTask(layers []Layer, m TileMap) *Task {

task.workerCount = viper.GetInt("task.workers")
task.savePipeSize = viper.GetInt("task.savepipe")
task.timeDelay = viper.GetInt("task.timedelay")
task.workers = make(chan maptile.Tile, task.workerCount)
task.savingpipe = make(chan Tile, task.savePipeSize)
task.bufSize = viper.GetInt("task.mergebuf")
Expand Down Expand Up @@ -268,8 +266,8 @@ func (task *Task) tileFetcher(t maptile.Tile, url string) {
T: t,
C: body,
}
if task.TileMap.Format == PBF {

if task.TileMap.Format == PBF {
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
_, err = zw.Write(body)
Expand All @@ -289,8 +287,12 @@ func (task *Task) tileFetcher(t maptile.Tile, url string) {
// task.wg.Add(1)
task.saveTile(tile)
}
secs := time.Since(start).Seconds()
fmt.Printf("tile(z:%d, x:%d, y:%d), %.3fs, %.2f kb, %s ...\n", t.Z, t.X, t.Y, secs, float32(len(body))/1024.0, pbf)
cost := time.Since(start).Milliseconds()
if cost < int64(task.timeDelay) {
time.Sleep(time.Duration(int64(task.timeDelay)-cost) * time.Millisecond)
}
cost = time.Since(start).Milliseconds()
fmt.Printf("tile(z:%d, x:%d, y:%d), %dms , %.2f kb, %s ...\n", t.Z, t.X, t.Y, cost, float32(len(body))/1024.0, pbf)
}

//DownloadZoom 下载指定层级
Expand Down Expand Up @@ -331,45 +333,6 @@ func (task *Task) downloadLayer(layer Layer) {
bar.FinishPrint(fmt.Sprintf("Task %s Zoom %d finished ~", task.ID, layer.Zoom))
}

//DownloadZoom 下载指定层级
func (task *Task) downloadGeom(geom orb.Geometry, zoom int) {
count := tilecover.GeometryCount(geom, maptile.Zoom(zoom))
bar := pb.New64(count).Prefix(fmt.Sprintf("Zoom %d : ", zoom))
bar.Start()

var tilelist = make(chan maptile.Tile, task.bufSize)

go func(g orb.Geometry, z maptile.Zoom, ch chan<- maptile.Tile) {
defer close(ch)
tilecover.GeometryChannel(g, z, ch)
}(geom, maptile.Zoom(zoom), tilelist)

for tile := range tilelist {
// log.Infof(`fetching tile %v ~`, tile)
select {
case task.workers <- tile:
bar.Increment()
task.Bar.Increment()
task.tileWG.Add(1)
go task.tileFetcher(tile, task.TileMap.URL)
case <-task.abort:
log.Infof("Task %s got canceled.", task.ID)
close(tilelist)
case <-task.pause:
log.Infof("Task %s suspended.", task.ID)
select {
case <-task.play:
log.Infof("task %s go on.", task.ID)
case <-task.abort:
log.Infof("task %s got canceled.", task.ID)
close(tilelist)
}
}
}
task.tileWG.Wait()
bar.FinishPrint(fmt.Sprintf("Task %s Zoom %d finished ~", task.ID, zoom))
}

//Download 开启下载任务
func (task *Task) Download() {
//g orb.Geometry, minz int, maxz int
Expand All @@ -386,72 +349,3 @@ func (task *Task) Download() {
}
task.Bar.FinishPrint(fmt.Sprintf("Task %s finished ~", task.ID))
}

//DownloadDepth 深度优先下载
func (task *Task) DownloadDepth() {
task.Bar = pb.New64(task.Total).Prefix("Fetching -> ").Postfix("\n")
task.Bar.Start()
// for _, layer := range task.Layers {
// task.downloadLayer(layer)
// break
// }
for i, layer := range task.Layers {
if i < 1 {
continue
}
task.tileSet.Lock()
zoomSet := task.tileSet.M
mfc := task.tileSet.M.ToFeatureCollection()
ifile := len(task.tileSet.M)
fmt.Printf("merging up append set, %d tiles ~\n", ifile)
fmt.Println("downloading started, Zoom:", layer.Zoom, "Tiles:", ifile)
bar := pb.StartNew(ifile).Prefix(fmt.Sprintf("Zoom %d : ", layer.Zoom)).Postfix("\n")
var wg sync.WaitGroup
wg.Add(1)
go func(name string, mfc *geojson.FeatureCollection) {
defer wg.Done()
data, err := json.MarshalIndent(mfc, "", " ")
if err != nil {
log.Printf("error marshalling json: %v", err)
return
}

err = ioutil.WriteFile(name+".geojson", data, 0644)
if err != nil {
log.Printf("write file failure: %v", err)
}
log.Printf("output finished : %s.geojson", name)

}(strconv.FormatInt(int64(ifile), 10), mfc)

task.tileSet.M = make(maptile.Set)
task.tileSet.Unlock()
cliperBuffer := make(chan orb.Geometry, 16)
go func(set maptile.Set, buffer chan<- orb.Geometry, bar *pb.ProgressBar) {
defer close(buffer)
for t := range set {
bar.Increment()
// buffer <- t.Bound()
log.Println("starting cliper...")
start := time.Now()
for _, g := range layer.Collection {
clipped := clip.Geometry(t.Bound(), g)
secs := time.Since(start).Seconds()
if clipped != nil {
buffer <- clipped
log.Printf("cliper add to buffer,time:%.4fs...", secs)
}
}
}
log.Printf("cliper buffer closing...")
close(buffer)
}(zoomSet, cliperBuffer, bar)

for geom := range cliperBuffer {
task.downloadGeom(geom, layer.Zoom)
}
wg.Wait() //wait for saving
bar.FinishPrint(fmt.Sprintf("zoom %d finished ~", layer.Zoom))
}
task.Bar.FinishPrint(fmt.Sprintf("Task %s finished ~", task.ID))
}

0 comments on commit fab4199

Please sign in to comment.