diff --git a/ais/test/etl_test.go b/ais/test/etl_test.go index 03ae2c9a15..9c67a36cfc 100644 --- a/ais/test/etl_test.go +++ b/ais/test/etl_test.go @@ -807,9 +807,10 @@ func TestETLHealth(t *testing.T) { time.Sleep(10 * time.Second) } + // TODO -- FIXME: see health handlers returning "OK" - revisit for _, msg := range healths { - tassert.Errorf(t, msg.Status == etl.HealthStatusRunning, "Expected pod at %s to be %q, got %q", - meta.Tname(msg.TargetID), etl.HealthStatusRunning, msg.Status) + tassert.Errorf(t, msg.Status == "Running", "Expected pod at %s to be running, got %q", + meta.Tname(msg.TargetID), msg.Status) } } diff --git a/ext/etl/api.go b/ext/etl/api.go index 2b29a4d3cb..10253b712f 100644 --- a/ext/etl/api.go +++ b/ext/etl/api.go @@ -25,47 +25,68 @@ const ( Code = "code" ) -const HealthStatusRunning = "Running" // TODO: add the full enum, if exists +// enum communication types (`commTypes`) +const ( + // ETL container receives POST request from target with the data. It + // must read the data and return response to the target which then will be + // transferred to the client. + Hpush = "hpush://" + // Target redirects the GET request to the ETL container. Then ETL container + // contacts the target via `AIS_TARGET_URL` env variable to get the data. + // The data is then transformed and returned to the client. + Hpull = "hpull://" + // Similar to redirection strategy but with usage of reverse proxy. + Hrev = "hrev://" + // Stdin/stdout communication. + HpushStdin = "io://" +) + +// enum arg types (`argTypes`) +const ( + ArgTypeDefault = "" + ArgTypeURL = "url" + ArgTypeFQN = "fqn" +) type ( InitMsg interface { Name() string - Type() string // Code or Spec + MsgType() string // Code or Spec CommType() string + ArgType() string Validate() error String() string } // and implementations InitMsgBase struct { - IDX string `json:"id"` - CommTypeX string `json:"communication"` + IDX string `json:"id"` // etlName (not to be confused) + CommTypeX string `json:"communication"` // enum commTypes + ArgTypeX string `json:"argument"` // enum argTypes Timeout cos.Duration `json:"timeout"` } InitSpecMsg struct { InitMsgBase - Spec []byte `json:"spec"` // NOTE: eq. `Spec` + Spec []byte `json:"spec"` } InitCodeMsg struct { InitMsgBase - Code []byte `json:"code"` // NOTE: eq. `Code` - Deps []byte `json:"dependencies"` - Runtime string `json:"runtime"` - TransformURL bool `json:"transform_url"` + Code []byte `json:"code"` + Deps []byte `json:"dependencies"` + Runtime string `json:"runtime"` // ======================================================================================== - // `InitCodeMsg` carries the name of the transforming function; + // InitCodeMsg carries the name of the transforming function; // the `Transform` function is mandatory and cannot be "" (empty) - it _will_ be called - // by the `Runtime` container (see etl/runtime/all.go for all supported pre-built runtimes); + // by the `Runtime` container (see etl/runtime/all.go for all supported pre-built runtimes); // ========================================================================================= - // TODO -- FIXME: decide if we need to remove nested struct for funcs Funcs struct { Transform string `json:"transform"` // cannot be omitted } // 0 (zero) - read the entire payload in memory and then transform it in one shot; // > 0 - use chunk-size buffering and transform incrementally, one chunk at a time ChunkSize int64 `json:"chunk_size"` - // bitwise flags: (streaming | debug | strict | ...) + // bitwise flags: (streaming | debug | strict | ...) future enhancements Flags int64 `json:"flags"` } ) @@ -100,45 +121,35 @@ type ( } ) -const ( - // ETL container receives POST request from target with the data. It - // must read the data and return response to the target which then will be - // transferred to the client. - Hpush = "hpush://" - // Target redirects the GET request to the ETL container. Then ETL container - // contacts the target via `AIS_TARGET_URL` env variable to get the data. - // The data is then transformed and returned to the client. - Hpull = "hpull://" - // Similar to redirection strategy but with usage of reverse proxy. - Hrev = "hrev://" - // Stdin/stdout communication. - HpushStdin = "io://" +var ( + commTypes = []string{Hpush, Hpull, Hrev, HpushStdin} // NOTE: must contain all + argTypes = []string{ArgTypeDefault, ArgTypeURL, ArgTypeFQN} // ditto ) -var commTypes = []string{Hpush, Hpull, Hrev, HpushStdin} // NOTE: must contain all - //////////////// // InitMsg*** // //////////////// -// interface guards +// interface guard var ( _ InitMsg = (*InitCodeMsg)(nil) _ InitMsg = (*InitSpecMsg)(nil) ) +// TransformURL bool `json:"transform_url"` TODO -- FIXME + func (m InitMsgBase) CommType() string { return m.CommTypeX } +func (m InitMsgBase) ArgType() string { return m.ArgTypeX } func (m InitMsgBase) Name() string { return m.IDX } - -func (*InitCodeMsg) Type() string { return Code } -func (*InitSpecMsg) Type() string { return Spec } +func (*InitCodeMsg) MsgType() string { return Code } +func (*InitSpecMsg) MsgType() string { return Spec } func (m *InitCodeMsg) String() string { - return fmt.Sprintf("init-%s[%s-%s-%s]", Code, m.IDX, m.CommTypeX, m.Runtime) + return fmt.Sprintf("init-%s[%s-%s-%s-%s]", Code, m.IDX, m.CommTypeX, m.ArgTypeX, m.Runtime) } func (m *InitSpecMsg) String() string { - return fmt.Sprintf("init-%s[%s-%s]", Spec, m.IDX, m.CommTypeX) + return fmt.Sprintf("init-%s[%s-%s-%s]", Spec, m.IDX, m.CommTypeX, m.ArgTypeX) } // TODO: double-take, unmarshaling-wise. To avoid, include (`Spec`, `Code`) in API calls @@ -161,10 +172,37 @@ func UnmarshalInitMsg(b []byte) (msg InitMsg, err error) { return } -func (m *InitCodeMsg) Validate() error { +func (m *InitMsgBase) validate(detail string) error { if err := k8s.ValidateEtlName(m.IDX); err != nil { - return fmt.Errorf("%v (%q, comm-type %q)", err, m.Runtime, m.CommTypeX) + return fmt.Errorf("%v [%s]", err, detail) } + errCtx := &cmn.ETLErrCtx{ETLName: m.Name()} + if err := validateCommType(m.CommType()); err != nil { + return cmn.NewErrETL(errCtx, "%v [%s]", err, detail) + } + + if !cos.StringInSlice(m.ArgTypeX, argTypes) { + err := fmt.Errorf("unsupported arg-type %q", m.ArgTypeX) + return cmn.NewErrETL(errCtx, "%v [%s]", err, detail) + } + if m.ArgTypeX == ArgTypeURL && m.CommTypeX != Hpull { + err := fmt.Errorf("arg-type %q requires comm-type %q (%q is not supported yet)", m.ArgTypeX, Hpull, m.CommTypeX) + return cmn.NewErrETL(errCtx, "%v [%s]", err, detail) + } + + // NOTE: default comm-type + if m.CommType() == "" { + cos.Infof("Warning: empty comm-type, defaulting to %q", Hpush) + m.CommTypeX = Hpush + } + return nil +} + +func (m *InitCodeMsg) Validate() error { + if err := m.InitMsgBase.validate(m.String()); err != nil { + return err + } + if len(m.Code) == 0 { return fmt.Errorf("source code is empty (%q)", m.Runtime) } @@ -174,12 +212,7 @@ func (m *InitCodeMsg) Validate() error { if _, ok := runtime.Get(m.Runtime); !ok { return fmt.Errorf("unsupported runtime %q (supported: %v)", m.Runtime, runtime.GetNames()) } - if m.CommTypeX == "" { - cos.Infof("Warning: empty comm-type, defaulting to %q (%q)", Hpush, m.Runtime) - m.CommTypeX = Hpush - } else if !cos.StringInSlice(m.CommTypeX, commTypes) { - return fmt.Errorf("unsupported comm-type %q (%q)", m.CommTypeX, m.Runtime) - } + if m.Funcs.Transform == "" { return fmt.Errorf("transform function cannot be empty (comm-type %q, funcs %+v)", m.CommTypeX, m.Funcs) } @@ -190,37 +223,18 @@ func (m *InitCodeMsg) Validate() error { return nil } -func ParsePodSpec(errCtx *cmn.ETLErrCtx, spec []byte) (*corev1.Pod, error) { - obj, _, err := scheme.Codecs.UniversalDeserializer().Decode(spec, nil, nil) - if err != nil { - return nil, cmn.NewErrETL(errCtx, "failed to parse pod spec: %v\n%q", err, string(spec)) - } - pod, ok := obj.(*corev1.Pod) - if !ok { - kind := obj.GetObjectKind().GroupVersionKind().Kind - return nil, cmn.NewErrETL(errCtx, "expected pod spec, got: %s", kind) - } - return pod, nil -} - func (m *InitSpecMsg) Validate() (err error) { - if err := k8s.ValidateEtlName(m.IDX); err != nil { + if err := m.InitMsgBase.validate(m.String()); err != nil { return err } + errCtx := &cmn.ETLErrCtx{ETLName: m.Name()} + + // Check pod specification constraints. pod, err := ParsePodSpec(errCtx, m.Spec) if err != nil { return err } - - if err := validateCommType(m.CommType()); err != nil { - return cmn.NewErrETL(errCtx, err.Error()) - } - if m.CommType() == "" { - m.CommTypeX = Hpush - } - - // Check pod specification constraints. if len(pod.Spec.Containers) != 1 { err = cmn.NewErrETL(errCtx, "unsupported number of containers (%d), expected: 1", len(pod.Spec.Containers)) return @@ -254,6 +268,19 @@ func (m *InitSpecMsg) Validate() (err error) { return nil } +func ParsePodSpec(errCtx *cmn.ETLErrCtx, spec []byte) (*corev1.Pod, error) { + obj, _, err := scheme.Codecs.UniversalDeserializer().Decode(spec, nil, nil) + if err != nil { + return nil, cmn.NewErrETL(errCtx, "failed to parse pod spec: %v\n%q", err, string(spec)) + } + pod, ok := obj.(*corev1.Pod) + if !ok { + kind := obj.GetObjectKind().GroupVersionKind().Kind + return nil, cmn.NewErrETL(errCtx, "expected pod spec, got: %s", kind) + } + return pod, nil +} + ////////////// // InfoList // ////////////// diff --git a/ext/etl/comm_test.go b/ext/etl/comm_test.go index ff5abdd71b..8b55135876 100644 --- a/ext/etl/comm_test.go +++ b/ext/etl/comm_test.go @@ -111,19 +111,19 @@ var _ = Describe("CommunicatorTest", func() { pod.SetName("somename") xctn := mock.NewXact(apc.ActETLInline) - comm = makeCommunicator(commArgs{ - boot: &etlBootstrapper{ - t: tMock, - msg: InitSpecMsg{ - InitMsgBase: InitMsgBase{ - CommTypeX: commType, - }, + boot := &etlBootstrapper{ + t: tMock, + msg: InitSpecMsg{ + InitMsgBase: InitMsgBase{ + CommTypeX: commType, }, - pod: pod, - uri: transformerServer.URL, - xctn: xctn, }, - }) + pod: pod, + uri: transformerServer.URL, + xctn: xctn, + } + comm = newCommunicator(nil, boot) + resp, err := http.Get(proxyServer.URL) Expect(err).NotTo(HaveOccurred()) defer resp.Body.Close() diff --git a/ext/etl/communicator.go b/ext/etl/communicator.go index 57532742e3..4b7ceb6bca 100644 --- a/ext/etl/communicator.go +++ b/ext/etl/communicator.go @@ -41,8 +41,6 @@ type ( PodName() string SvcName() string - CommType() string - String() string // InlineTransform uses one of the two ETL container endpoints: @@ -60,35 +58,20 @@ type ( CommStats } - commArgs struct { + baseComm struct { listener meta.Slistener boot *etlBootstrapper } - - baseComm struct { - meta.Slistener - t cluster.Target - xctn cluster.Xact - config *cmn.Config - name string - podName string - commType string - } - pushComm struct { baseComm - mem *memsys.MMSA - uri string command []string } redirectComm struct { baseComm - uri string } revProxyComm struct { baseComm - rp *httputil.ReverseProxy - uri string + rp *httputil.ReverseProxy } // TODO: Generalize and move to `cos` package @@ -111,31 +94,26 @@ var ( // baseComm // ////////////// -func makeCommunicator(args commArgs) Communicator { - baseComm := baseComm{ - Slistener: args.listener, - t: args.boot.t, - xctn: args.boot.xctn, - config: args.boot.config, - name: args.boot.originalPodName, - podName: args.boot.pod.Name, - } - - switch args.boot.msg.CommTypeX { - case Hpush: - baseComm.commType = Hpush - return &pushComm{ - baseComm: baseComm, - mem: args.boot.t.PageMM(), - uri: args.boot.uri, +func newCommunicator(listener meta.Slistener, boot *etlBootstrapper) Communicator { + switch boot.msg.CommTypeX { + case Hpush, HpushStdin: + pc := &pushComm{} + pc.listener, pc.boot = listener, boot + if boot.msg.CommTypeX == HpushStdin { // io:// + pc.command = boot.originalCommand } + return pc case Hpull: - baseComm.commType = Hpull - return &redirectComm{baseComm: baseComm, uri: args.boot.uri} + rc := &redirectComm{} + rc.listener, rc.boot = listener, boot + return rc case Hrev: - transformerURL, err := url.Parse(args.boot.uri) + rp := &revProxyComm{} + rp.listener, rp.boot = listener, boot + + transformerURL, err := url.Parse(boot.uri) debug.AssertNoErr(err) - rp := &httputil.ReverseProxy{ + revProxy := &httputil.ReverseProxy{ Director: func(req *http.Request) { // Replacing the `req.URL` host with ETL container host req.URL.Scheme = transformerURL.Scheme @@ -147,40 +125,33 @@ func makeCommunicator(args commArgs) Communicator { } }, } - baseComm.commType = Hrev - return &revProxyComm{baseComm: baseComm, rp: rp, uri: args.boot.uri} - case HpushStdin: - baseComm.commType = HpushStdin - return &pushComm{ - baseComm: baseComm, - mem: args.boot.t.PageMM(), - uri: args.boot.uri, - command: args.boot.originalCommand, - } - default: - debug.Assert(false, args.boot.msg.CommTypeX) + rp.rp = revProxy + return rp } + + debug.Assert(false, "unknown comm-type '"+boot.msg.CommTypeX+"'") return nil } -func (c *baseComm) Name() string { return c.name } -func (c *baseComm) PodName() string { return c.podName } -func (c *baseComm) SvcName() string { return c.podName /*pod name is same as service name*/ } -func (c *baseComm) CommType() string { return c.commType } +func (c *baseComm) Name() string { return c.boot.originalPodName } +func (c *baseComm) PodName() string { return c.boot.pod.Name } +func (c *baseComm) SvcName() string { return c.boot.pod.Name /*same as pod name*/ } + +func (c *baseComm) ListenSmapChanged() { c.listener.ListenSmapChanged() } func (c *baseComm) String() string { - return fmt.Sprintf("%s[%s]-%s", c.name, c.xctn.ID(), c.commType) + return fmt.Sprintf("%s[%s]-%s", c.boot.originalPodName, c.boot.xctn.ID(), c.boot.msg.CommTypeX) } -func (c *baseComm) Xact() cluster.Xact { return c.xctn } -func (c *baseComm) ObjCount() int64 { return c.xctn.Objs() } -func (c *baseComm) InBytes() int64 { return c.xctn.InBytes() } -func (c *baseComm) OutBytes() int64 { return c.xctn.OutBytes() } +func (c *baseComm) Xact() cluster.Xact { return c.boot.xctn } +func (c *baseComm) ObjCount() int64 { return c.boot.xctn.Objs() } +func (c *baseComm) InBytes() int64 { return c.boot.xctn.InBytes() } +func (c *baseComm) OutBytes() int64 { return c.boot.xctn.OutBytes() } -func (c *baseComm) Stop() { c.xctn.Finish() } +func (c *baseComm) Stop() { c.boot.xctn.Finish() } func (c *baseComm) getWithTimeout(url string, size int64, timeout time.Duration) (r cos.ReadCloseSizer, err error) { - if err := c.xctn.AbortErr(); err != nil { + if err := c.boot.xctn.AbortErr(); err != nil { return nil, err } @@ -197,7 +168,7 @@ func (c *baseComm) getWithTimeout(url string, size int64, timeout time.Duration) req, err = http.NewRequest(http.MethodGet, url, http.NoBody) } if err == nil { - resp, err = c.t.DataClient().Do(req) //nolint:bodyclose // Closed by the caller. + resp, err = c.boot.t.DataClient().Do(req) //nolint:bodyclose // Closed by the caller. } if err != nil { if cancel != nil { @@ -209,44 +180,46 @@ func (c *baseComm) getWithTimeout(url string, size int64, timeout time.Duration) return cos.NewReaderWithArgs(cos.ReaderArgs{ R: resp.Body, Size: resp.ContentLength, - ReadCb: func(n int, err error) { c.xctn.InObjsAdd(0, int64(n)) }, + ReadCb: func(n int, err error) { c.boot.xctn.InObjsAdd(0, int64(n)) }, DeferCb: func() { if cancel != nil { cancel() } - c.xctn.InObjsAdd(1, 0) - c.xctn.OutObjsAdd(1, size) // see also: `coi.objsAdd` + c.boot.xctn.InObjsAdd(1, 0) + c.boot.xctn.OutObjsAdd(1, size) // see also: `coi.objsAdd` }, }), nil } ////////////// -// pushComm // +// pushComm: implements (Hpush | HpushStdin) ////////////// func (pc *pushComm) doRequest(bck *meta.Bck, lom *cluster.LOM, timeout time.Duration) (r cos.ReadCloseSizer, err error) { if err := lom.InitBck(bck.Bucket()); err != nil { return nil, err } - r, err = pc.tryDoRequest(lom, timeout) + + lom.Lock(false) + r, err = pc.do(lom, timeout) + lom.Unlock(false) + if err != nil && cmn.IsObjNotExist(err) && bck.IsRemote() { - _, err = pc.t.GetCold(context.Background(), lom, cmn.OwtGetLock) + _, err = pc.boot.t.GetCold(context.Background(), lom, cmn.OwtGetLock) if err != nil { return nil, err } - r, err = pc.tryDoRequest(lom, timeout) + lom.Lock(false) + r, err = pc.do(lom, timeout) + lom.Unlock(false) } return } -func (pc *pushComm) tryDoRequest(lom *cluster.LOM, timeout time.Duration) (cos.ReadCloseSizer, error) { - if err := pc.xctn.AbortErr(); err != nil { +func (pc *pushComm) do(lom *cluster.LOM, timeout time.Duration) (cos.ReadCloseSizer, error) { + if err := pc.boot.xctn.AbortErr(); err != nil { return nil, err } - - lom.Lock(false) - defer lom.Unlock(false) - if err := lom.Load(false /*cache it*/, true /*locked*/); err != nil { return nil, err } @@ -259,11 +232,16 @@ func (pc *pushComm) tryDoRequest(lom *cluster.LOM, timeout time.Duration) (cos.R } var ( + cancel func() req *http.Request resp *http.Response - cancel func() - url = pc.uri + "/" + lom.Bck().Name + "/" + lom.ObjName + url = pc.boot.uri + "/" + lom.Bck().Name + "/" + lom.ObjName ) + + debug.Assert(lom.Bck().Ns.IsGlobal(), lom.Bck().Ns.String()) // the url (above) simplifies out bucket's namespace + + // TODO -- FIXME: switch(ArgType) + if timeout != 0 { var ctx context.Context ctx, cancel = context.WithTimeout(context.Background(), timeout) @@ -282,7 +260,7 @@ func (pc *pushComm) tryDoRequest(lom *cluster.LOM, timeout time.Duration) (cos.R } req.ContentLength = size req.Header.Set(cos.HdrContentType, cos.ContentBinary) - resp, err = pc.t.DataClient().Do(req) //nolint:bodyclose // Closed by the caller. + resp, err = pc.boot.t.DataClient().Do(req) //nolint:bodyclose // Closed by the caller. finish: if err != nil { if cancel != nil { @@ -294,13 +272,13 @@ finish: return cos.NewReaderWithArgs(cos.ReaderArgs{ R: resp.Body, Size: resp.ContentLength, - ReadCb: func(n int, err error) { pc.xctn.InObjsAdd(0, int64(n)) }, + ReadCb: func(n int, err error) { pc.boot.xctn.InObjsAdd(0, int64(n)) }, DeferCb: func() { if cancel != nil { cancel() } - pc.xctn.InObjsAdd(1, 0) - pc.xctn.OutObjsAdd(1, size) // see also: `coi.objsAdd` + pc.boot.xctn.InObjsAdd(1, 0) + pc.boot.xctn.OutObjsAdd(1, size) // see also: `coi.objsAdd` }, }), nil } @@ -312,18 +290,19 @@ func (pc *pushComm) InlineTransform(w http.ResponseWriter, _ *http.Request, bck if err != nil { return err } - if pc.config.FastV(5, cos.SmoduleETL) { + if pc.boot.config.FastV(5, cos.SmoduleETL) { nlog.Infoln(Hpush, lom.Cname(), err) } - defer r.Close() size := r.Size() if size < 0 { size = memsys.DefaultBufSize // TODO: track the average } - buf, slab := pc.mem.AllocSize(size) + buf, slab := pc.boot.t.PageMM().AllocSize(size) _, err = io.CopyBuffer(w, r, buf) + slab.Free(buf) + r.Close() return err } @@ -331,18 +310,18 @@ func (pc *pushComm) OfflineTransform(bck *meta.Bck, objName string, timeout time lom := cluster.AllocLOM(objName) r, err = pc.doRequest(bck, lom, timeout) cluster.FreeLOM(lom) - if err == nil && pc.config.FastV(5, cos.SmoduleETL) { + if err == nil && pc.boot.config.FastV(5, cos.SmoduleETL) { nlog.Infoln(Hpush, lom.Cname(), err) } return } ////////////////// -// redirectComm // +// redirectComm: implements Hpull ////////////////// func (rc *redirectComm) InlineTransform(w http.ResponseWriter, r *http.Request, bck *meta.Bck, objName string) error { - if err := rc.xctn.AbortErr(); err != nil { + if err := rc.boot.xctn.AbortErr(); err != nil { return err } @@ -350,12 +329,12 @@ func (rc *redirectComm) InlineTransform(w http.ResponseWriter, r *http.Request, if err != nil { return err } - rc.xctn.OutObjsAdd(1, size) + rc.boot.xctn.OutObjsAdd(1, size) // is there a way to determine `rc.stats.outBytes`? - redirectURL := cos.JoinPath(rc.uri, transformerPath(bck, objName)) + redirectURL := cos.JoinPath(rc.boot.uri, transformerPath(bck, objName)) http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect) - if rc.config.FastV(5, cos.SmoduleETL) { + if rc.boot.config.FastV(5, cos.SmoduleETL) { nlog.Infoln(Hpull, bck.Cname(objName)) } return nil @@ -366,16 +345,16 @@ func (rc *redirectComm) OfflineTransform(bck *meta.Bck, objName string, timeout if errV != nil { return nil, errV } - etlURL := cos.JoinPath(rc.uri, transformerPath(bck, objName)) + etlURL := cos.JoinPath(rc.boot.uri, transformerPath(bck, objName)) r, err := rc.getWithTimeout(etlURL, size, timeout) - if rc.config.FastV(5, cos.SmoduleETL) { + if rc.boot.config.FastV(5, cos.SmoduleETL) { nlog.Infoln(Hpull, bck.Cname(objName), err) } return r, err } ////////////////// -// revProxyComm // +// revProxyComm: implements Hrev ////////////////// func (rp *revProxyComm) InlineTransform(w http.ResponseWriter, r *http.Request, bck *meta.Bck, objName string) error { @@ -383,7 +362,7 @@ func (rp *revProxyComm) InlineTransform(w http.ResponseWriter, r *http.Request, if err != nil { return err } - rp.xctn.OutObjsAdd(1, size) + rp.boot.xctn.OutObjsAdd(1, size) // is there a way to determine `rc.stats.outBytes`? path := transformerPath(bck, objName) @@ -398,9 +377,9 @@ func (rp *revProxyComm) OfflineTransform(bck *meta.Bck, objName string, timeout if errV != nil { return nil, errV } - etlURL := cos.JoinPath(rp.uri, transformerPath(bck, objName)) + etlURL := cos.JoinPath(rp.boot.uri, transformerPath(bck, objName)) r, err := rp.getWithTimeout(etlURL, size, timeout) - if rp.config.FastV(5, cos.SmoduleETL) { + if rp.boot.config.FastV(5, cos.SmoduleETL) { nlog.Infoln(Hrev, bck.Cname(objName), err) } return r, err diff --git a/ext/etl/emd.go b/ext/etl/emd.go index ecf7bc5528..75e5d208c0 100644 --- a/ext/etl/emd.go +++ b/ext/etl/emd.go @@ -84,7 +84,7 @@ func (e *MD) MarshalJSON() ([]byte, error) { Ext: e.Ext, } for k, v := range e.ETLs { - jsonMD.ETLs[k] = jsonETL{v.Type(), cos.MustMarshal(v)} + jsonMD.ETLs[k] = jsonETL{v.MsgType(), cos.MustMarshal(v)} } return jsoniter.Marshal(jsonMD) } diff --git a/ext/etl/transform.go b/ext/etl/transform.go index 6e9ef38515..c24d8a6ba8 100644 --- a/ext/etl/transform.go +++ b/ext/etl/transform.go @@ -191,8 +191,6 @@ func InitCode(t cluster.Target, msg *InitCodeMsg, xid string) error { } // generate (from => to) replacements -// -//nolint:gocritic // appendCombine vs readability func fromToPairs(msg *InitCodeMsg) (ftp []string) { var ( chunk string @@ -200,27 +198,19 @@ func fromToPairs(msg *InitCodeMsg) (ftp []string) { name = msg.IDX ) ftp = make([]string, 0, 16) - ftp = append(ftp, "", name) - ftp = append(ftp, "", msg.CommTypeX) + ftp = append(ftp, "", name, "", msg.CommTypeX, "", msg.ArgTypeX) - // chunk == 0 means no chunks (and no streaming) - in other words, - // reading the entire payload in memory, and then transforming in one shot + // chunk == 0 means no chunks (and no streaming) - ie., + // reading the entire payload in memory and then transforming in one shot if msg.ChunkSize > 0 { chunk = "\"" + strconv.FormatInt(msg.ChunkSize, 10) + "\"" } ftp = append(ftp, "", chunk) - if msg.CommTypeX == Hpull && msg.TransformURL { - ftp = append(ftp, "", "url") - } - if msg.Flags > 0 { flags = "\"" + strconv.FormatInt(msg.Flags, 10) + "\"" } - ftp = append(ftp, "", flags) - - // functions - ftp = append(ftp, "", msg.Funcs.Transform) + ftp = append(ftp, "", flags, "", msg.Funcs.Transform) switch msg.CommTypeX { case Hpush, Hpull, Hrev: @@ -299,14 +289,11 @@ func start(t cluster.Target, msg *InitSpecMsg, xid string, opts StartOpts, confi boot.setupXaction(xid) // finally, add Communicator to the runtime registry - c := makeCommunicator(commArgs{ - listener: newAborter(t, msg.IDX), - boot: boot, - }) - if err = reg.add(msg.IDX, c); err != nil { + comm := newCommunicator(newAborter(t, msg.IDX), boot) + if err = reg.add(msg.IDX, comm); err != nil { return } - t.Sowner().Listeners().Reg(c) + t.Sowner().Listeners().Reg(comm) return }