Skip to content

Commit

Permalink
Merge pull request #1 from nkmr-jp/feat-multi-url
Browse files Browse the repository at this point in the history
Feat multi url
  • Loading branch information
nkmr-jp committed Jul 22, 2022
2 parents 9b44055 + 5a45b05 commit e901c2b
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 43 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ PROJECT_NUMBER=$(shell gcloud projects list --filter="project_id:$(PROJECT_ID)"
FUNC_NAME=fetch
ENTRY_POINT=Fetch
TOPIC_NAME=$(FUNC_NAME)-topic
BUCKET_NAME=$(PROJECT_ID)-fetch
BUCKET_NAME=$(PROJECT_ID)-data-fetch
# VERSION=$(shell git rev-parse --short HEAD)
VERSION=$(shell git describe --abbrev=0 --tags)

Expand Down
27 changes: 18 additions & 9 deletions fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
nu "net/url"
"os"
"strings"
"time"

"cloud.google.com/go/storage"
Expand All @@ -28,12 +29,17 @@ func Run(ctx context.Context, event event.Event) error {
defer zl.Sync() // Flush log file buffer. for debug in mac local.

bucket := getEnv()
url := parseEvent(event)
gcsPath := parseURL(url)
buf := get(url)

if err := save(ctx, bucket, gcsPath, buf); err != nil {
return err
urls := parseEvent(event)
if urls == nil {
return fmt.Errorf("urls is nil")
}
for i := range urls {
gcsPath := parseURL(urls[i])
buf := get(urls[i])
if err := save(ctx, bucket, gcsPath, buf); err != nil {
return err
}
time.Sleep(time.Second)
}

return nil
Expand Down Expand Up @@ -78,20 +84,22 @@ func parseURL(s string) string {
return url.Host + url.Path
}

func parseEvent(event event.Event) string {
func parseEvent(event event.Event) []string {
var data pubsub.MessagePublishedData

err := json.Unmarshal(event.Data(), &data)
if err != nil {
zl.Error("UNMARSHAL_ERROR", err)
return ""
return nil
}
zl.Info("CLOUD_EVENT_RECEIVED",
zap.String("cloudEventContext", event.Context.String()),
zap.Any("cloudEventData", data),
)

return string(data.Message.Data)
// return string(data.Message.Data)
dataStr := strings.TrimSpace(string(data.Message.Data))
return strings.Split(dataStr, "\n")
}

func getEnv() (bucket string) {
Expand Down Expand Up @@ -135,6 +143,7 @@ func writeBuf(ctx context.Context, client *storage.Client, bucket, object string

// writer close
fields := []zap.Field{
zl.Console(object),
zap.String("bucket", bucket),
zap.String("object", object),
}
Expand Down
111 changes: 78 additions & 33 deletions fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,56 +11,101 @@ import (
"github.com/googleapis/google-cloudevents-go/cloud/pubsub/v1"
fetch "github.com/nkmr-jp/gcf-fetch"
"github.com/stretchr/testify/assert"
"google.golang.org/api/iterator"
)

func TestRun(t *testing.T) {
test := NewTestFetch()
test.setup(t)
objPath := "api.github.com/users/github"
ctx := context.Background()
client, _ := storage.NewClient(ctx)
test := NewTestFetch(t)

// Get generation before send pubsub message.
var preGeneration int64
rc, err := client.Bucket(os.Getenv("BUCKET_NAME")).Object(objPath).NewReader(ctx)
defer rc.Close() // nolint
if err == nil {
preGeneration = rc.Attrs.Generation
}
t.Run("single url", func(t *testing.T) {
objPath := "api.github.com/users/github"
pubsubData := "https://api.github.com/users/github"
ctx := context.Background()
test.deleteObjects(ctx, "api.github.com")

// Send pubsub message
if err := fetch.Run(ctx, test.event); err != nil {
assert.Fail(t, err.Error())
}
// Send pubsub message1
if err := fetch.Run(ctx, test.event(pubsubData)); err != nil {
assert.Fail(t, err.Error())
}
reader1 := test.getObject(ctx, objPath)

// Get generation after send pubsub message.
rc2, err := client.Bucket(os.Getenv("BUCKET_NAME")).Object(objPath).NewReader(ctx)
defer rc.Close() // nolint
if err != nil {
assert.Fail(t, err.Error())
}
// Send pubsub message2
if err := fetch.Run(ctx, test.event(pubsubData)); err != nil {
assert.Fail(t, err.Error())
}
reader2 := test.getObject(ctx, objPath)
// Get generation after send pubsub message.
assert.NotEqual(t, reader1.Attrs.Generation, reader2.Attrs.Generation)
})

t.Run("multi url", func(t *testing.T) {
pubsubData := `
https://api.github.com/users/github
https://api.github.com/users/github/followers
`
ctx := context.Background()
test.deleteObjects(ctx, "api.github.com")
if err := fetch.Run(ctx, test.event(pubsubData)); err != nil {
assert.Fail(t, err.Error())
}

assert.NotEqual(t, preGeneration, rc2.Attrs.Generation)
assert.NotNilf(t, test.getObject(ctx, "api.github.com/users/github"), "reader1")
assert.NotNilf(t, test.getObject(ctx, "api.github.com/users/github/followers"), "reader2")
})
}

type TestFetch struct {
event event.Event
t *testing.T
}

func NewTestFetch(t *testing.T) *TestFetch {
return &TestFetch{t}
}

func NewTestFetch() *TestFetch {
return &TestFetch{}
func (f *TestFetch) deleteObjects(ctx context.Context, objPath string) {
client, _ := storage.NewClient(ctx)
query := storage.Query{Prefix: objPath, Versions: true}
bucket := client.Bucket(os.Getenv("BUCKET_NAME"))
it := bucket.Objects(ctx, &query)
for {
objAttrs, err := it.Next()
if err != nil && err != iterator.Done {
assert.Fail(f.t, err.Error())
}
if err == iterator.Done {
break
}
if err := bucket.Object(objAttrs.Name).Generation(objAttrs.Generation).Delete(ctx); err != nil {
assert.Fail(f.t, err.Error())
}
}
}

func (f *TestFetch) getObject(ctx context.Context, objPath string) *storage.Reader {
client, _ := storage.NewClient(ctx)
reader, err := client.Bucket(os.Getenv("BUCKET_NAME")).Object(objPath).NewReader(ctx)
defer func(rc *storage.Reader) {
err := rc.Close()
if err != nil {
assert.Fail(f.t, err.Error())
}
}(reader)
if err != nil {
assert.Fail(f.t, err.Error())
}
return reader
}

func (f *TestFetch) setup(t *testing.T) {
func (f *TestFetch) event(data string) event.Event {
msg := pubsub.MessagePublishedData{
Message: &pubsub.Message{
Data: []byte("https://api.github.com/users/github"),
Data: []byte(data),
},
}

f.event = event.New()
f.event.SetDataContentType("application/json")
if err := f.event.SetData(f.event.DataContentType(), msg); err != nil {
assert.Fail(t, err.Error())
e := event.New()
e.SetDataContentType("application/json")
if err := e.SetData(e.DataContentType(), msg); err != nil {
assert.Fail(f.t, err.Error())
}
return e
}

0 comments on commit e901c2b

Please sign in to comment.