Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulkservice proposal #3

Open
notzippy opened this issue Dec 11, 2015 · 3 comments
Open

Bulkservice proposal #3

notzippy opened this issue Dec 11, 2015 · 3 comments

Comments

@notzippy
Copy link

I noticed the bulk services is not implemented, I am not sure if you wanted add support for it or not, but I thought I would toss out a possible implementation.

package crate

import (
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
    "net/url"
    "reflect"
    "sync"
    "github.com/herenow/go-crate"
)

type (
    CrateBulkDriver struct {
        Url    string // Crate http endpoint url
        client *http.Client
        lock   sync.Mutex
    }
    CrateService struct {
        Statement string           `json:"stmt"`
        BulkArgs  [][]interface{}  `json:"bulk_args"`
        Fields    []string         `json:"-"`
        Table     string           `json:"-"`
        Driver    *CrateBulkDriver `json:"-"`
    }
    InsertService struct {
        CrateService
    }
    BulkResponse struct {
        Error    struct {
                    Message string
                    Code    int
                } `json:"error"`
        Cols     []string             `json:"cols"`
        Duration float64              `json:"duration"`
        Results  []BulkResponseResult `json:"results"`
    }
    BulkResponseResult struct {
        Rowcount int `json:"rowcount"`
    }
)

// Init a new "Connection" to a Crate Data Storage instance.
// Note that the connection is not tested until the first query.
func NewBulkConnection(crate_url string) (c *CrateBulkDriver, e error) {
    c = &CrateBulkDriver{
        lock: sync.Mutex{},
    }
    u, err := url.Parse(crate_url)

    if err != nil {
        return nil, err
    }

    sanUrl := fmt.Sprintf("%s://%s", u.Scheme, u.Host)

    c.Url = sanUrl
    c.client = &http.Client{}

    return c, nil
}

// Creates a new insert service for crate, using the table and fields
func (driver *CrateBulkDriver) NewInsertService(table string, fields ...string) (i *InsertService) {
    buffer := &bytes.Buffer{}
    values := &bytes.Buffer{}
    fmt.Fprintf(buffer, "INSERT INTO %s (", table)
    fmt.Fprintf(values, "VALUES (")
    first := true
    for _, field := range fields {
        if !first {
            buffer.WriteString(", ")
            values.WriteString(", ")
        } else {
            first = false
        }
        buffer.WriteString(field)
        values.WriteString("?")
    }
    buffer.WriteString(") ")
    values.WriteString(") ")
    buffer.Write(values.Bytes())
    i = &InsertService{
        CrateService{
            Statement: string(buffer.Bytes()),
            Table:     table,
            Fields:    fields,
            Driver:    driver,
        },
    }
    return i
}

// Add a specfic interface to the bulk args, could blow up if field name does not exist
func (service *InsertService) Add(data interface{}) {
    // Simplest addition
    orderedValues := make([]interface{}, len(service.Fields))
    for i, v := range service.Fields {
        orderedValues[i] = reflect.ValueOf(data).Elem().FieldByName(v).Interface()
    }
    service.BulkArgs = append(service.BulkArgs, orderedValues)

}

// Use the map to add a bulk arguement
func (service *InsertService) AddMap(data map[string]interface{}) {
    // Simplest addition
    orderedValues := make([]interface{}, len(service.Fields))
    for i, v := range service.Fields {
        orderedValues[i] = data[v]
    }
    service.BulkArgs = append(service.BulkArgs, orderedValues)
}

// Perform the bulk operation on this service
func (service *CrateService) Do() (*BulkResponse, error) {
    service.Driver.lock.Lock()
    defer service.Driver.lock.Unlock()
    res := &BulkResponse{}
    if len(service.BulkArgs) == 0 {
        return res, nil
    }
    endpoint := service.Driver.Url + "/_sql?pretty"

    buf, err := json.Marshal(service)

    if err != nil {
        return nil, err
    }

    data := bytes.NewReader(buf)

    resp, err := service.Driver.client.Post(endpoint, "application/json", data)
    if err != nil {
        return nil, err
    }

    defer resp.Body.Close()
    d := json.NewDecoder(resp.Body)

    // We need to set this, or long integers will be interpreted as floats
    d.UseNumber()

    err = d.Decode(res)

    //fmt.Printf("\n\n response: %#v \n",res)

    if err != nil {
        return nil, err
    }

    //  // Check for db errors
    if res.Error.Code != 0 {
        err = &crate.CrateErr{
            Code:    res.Error.Code,
            Message: res.Error.Message,
        }
        return nil, err
    }

    return res, nil
}

I havent done the update or delete services but as you can see they would be pretty easy to add. The only extra thing I could see is adding a function to BulkResponse to check to see if any of the results returned a -1...

thoughts ??

@herenow
Copy link
Owner

herenow commented Dec 14, 2015

We could add a bulk package to this project, I was unsure if I wanted to maintain extra packages in this project, such as the blob package. But I think this is the way to go.

We could probably use the database/sql package and return Rows in the BulkResponse.

@0i
Copy link

0i commented Aug 1, 2017

I also need this feature.

@kiura24metrics
Copy link

Any updates on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants