Skip to content

Commit

Permalink
etl: refactor and cleanup construction
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 23, 2023
1 parent 43ad47f commit 2390f7f
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 197 deletions.
5 changes: 3 additions & 2 deletions ais/test/etl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,9 +807,10 @@ func TestETLHealth(t *testing.T) {
time.Sleep(10 * time.Second)
}

// TODO -- FIXME: see health handlers returning "OK" - revisit
for _, msg := range healths {
tassert.Errorf(t, msg.Status == etl.HealthStatusRunning, "Expected pod at %s to be %q, got %q",
meta.Tname(msg.TargetID), etl.HealthStatusRunning, msg.Status)
tassert.Errorf(t, msg.Status == "Running", "Expected pod at %s to be running, got %q",
meta.Tname(msg.TargetID), msg.Status)
}
}

Expand Down
157 changes: 92 additions & 65 deletions ext/etl/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,47 +25,68 @@ const (
Code = "code"
)

const HealthStatusRunning = "Running" // TODO: add the full enum, if exists
// enum communication types (`commTypes`)
const (
// ETL container receives POST request from target with the data. It
// must read the data and return response to the target which then will be
// transferred to the client.
Hpush = "hpush://"
// Target redirects the GET request to the ETL container. Then ETL container
// contacts the target via `AIS_TARGET_URL` env variable to get the data.
// The data is then transformed and returned to the client.
Hpull = "hpull://"
// Similar to redirection strategy but with usage of reverse proxy.
Hrev = "hrev://"
// Stdin/stdout communication.
HpushStdin = "io://"
)

// enum arg types (`argTypes`)
const (
ArgTypeDefault = ""
ArgTypeURL = "url"
ArgTypeFQN = "fqn"
)

type (
InitMsg interface {
Name() string
Type() string // Code or Spec
MsgType() string // Code or Spec
CommType() string
ArgType() string
Validate() error
String() string
}

// and implementations
InitMsgBase struct {
IDX string `json:"id"`
CommTypeX string `json:"communication"`
IDX string `json:"id"` // etlName (not to be confused)
CommTypeX string `json:"communication"` // enum commTypes
ArgTypeX string `json:"argument"` // enum argTypes
Timeout cos.Duration `json:"timeout"`
}
InitSpecMsg struct {
InitMsgBase
Spec []byte `json:"spec"` // NOTE: eq. `Spec`
Spec []byte `json:"spec"`
}

InitCodeMsg struct {
InitMsgBase
Code []byte `json:"code"` // NOTE: eq. `Code`
Deps []byte `json:"dependencies"`
Runtime string `json:"runtime"`
TransformURL bool `json:"transform_url"`
Code []byte `json:"code"`
Deps []byte `json:"dependencies"`
Runtime string `json:"runtime"`
// ========================================================================================
// `InitCodeMsg` carries the name of the transforming function;
// InitCodeMsg carries the name of the transforming function;
// the `Transform` function is mandatory and cannot be "" (empty) - it _will_ be called
// by the `Runtime` container (see etl/runtime/all.go for all supported pre-built runtimes);
// by the `Runtime` container (see etl/runtime/all.go for all supported pre-built runtimes);
// =========================================================================================
// TODO -- FIXME: decide if we need to remove nested struct for funcs
Funcs struct {
Transform string `json:"transform"` // cannot be omitted
}
// 0 (zero) - read the entire payload in memory and then transform it in one shot;
// > 0 - use chunk-size buffering and transform incrementally, one chunk at a time
ChunkSize int64 `json:"chunk_size"`
// bitwise flags: (streaming | debug | strict | ...)
// bitwise flags: (streaming | debug | strict | ...) future enhancements
Flags int64 `json:"flags"`
}
)
Expand Down Expand Up @@ -100,45 +121,35 @@ type (
}
)

const (
// ETL container receives POST request from target with the data. It
// must read the data and return response to the target which then will be
// transferred to the client.
Hpush = "hpush://"
// Target redirects the GET request to the ETL container. Then ETL container
// contacts the target via `AIS_TARGET_URL` env variable to get the data.
// The data is then transformed and returned to the client.
Hpull = "hpull://"
// Similar to redirection strategy but with usage of reverse proxy.
Hrev = "hrev://"
// Stdin/stdout communication.
HpushStdin = "io://"
var (
commTypes = []string{Hpush, Hpull, Hrev, HpushStdin} // NOTE: must contain all
argTypes = []string{ArgTypeDefault, ArgTypeURL, ArgTypeFQN} // ditto
)

var commTypes = []string{Hpush, Hpull, Hrev, HpushStdin} // NOTE: must contain all

////////////////
// InitMsg*** //
////////////////

// interface guards
// interface guard
var (
_ InitMsg = (*InitCodeMsg)(nil)
_ InitMsg = (*InitSpecMsg)(nil)
)

// TransformURL bool `json:"transform_url"` TODO -- FIXME

func (m InitMsgBase) CommType() string { return m.CommTypeX }
func (m InitMsgBase) ArgType() string { return m.ArgTypeX }
func (m InitMsgBase) Name() string { return m.IDX }

func (*InitCodeMsg) Type() string { return Code }
func (*InitSpecMsg) Type() string { return Spec }
func (*InitCodeMsg) MsgType() string { return Code }
func (*InitSpecMsg) MsgType() string { return Spec }

func (m *InitCodeMsg) String() string {
return fmt.Sprintf("init-%s[%s-%s-%s]", Code, m.IDX, m.CommTypeX, m.Runtime)
return fmt.Sprintf("init-%s[%s-%s-%s-%s]", Code, m.IDX, m.CommTypeX, m.ArgTypeX, m.Runtime)
}

func (m *InitSpecMsg) String() string {
return fmt.Sprintf("init-%s[%s-%s]", Spec, m.IDX, m.CommTypeX)
return fmt.Sprintf("init-%s[%s-%s-%s]", Spec, m.IDX, m.CommTypeX, m.ArgTypeX)
}

// TODO: double-take, unmarshaling-wise. To avoid, include (`Spec`, `Code`) in API calls
Expand All @@ -161,10 +172,37 @@ func UnmarshalInitMsg(b []byte) (msg InitMsg, err error) {
return
}

func (m *InitCodeMsg) Validate() error {
func (m *InitMsgBase) validate(detail string) error {
if err := k8s.ValidateEtlName(m.IDX); err != nil {
return fmt.Errorf("%v (%q, comm-type %q)", err, m.Runtime, m.CommTypeX)
return fmt.Errorf("%v [%s]", err, detail)
}
errCtx := &cmn.ETLErrCtx{ETLName: m.Name()}
if err := validateCommType(m.CommType()); err != nil {
return cmn.NewErrETL(errCtx, "%v [%s]", err, detail)
}

if !cos.StringInSlice(m.ArgTypeX, argTypes) {
err := fmt.Errorf("unsupported arg-type %q", m.ArgTypeX)
return cmn.NewErrETL(errCtx, "%v [%s]", err, detail)
}
if m.ArgTypeX == ArgTypeURL && m.CommTypeX != Hpull {
err := fmt.Errorf("arg-type %q requires comm-type %q (%q is not supported yet)", m.ArgTypeX, Hpull, m.CommTypeX)
return cmn.NewErrETL(errCtx, "%v [%s]", err, detail)
}

// NOTE: default comm-type
if m.CommType() == "" {
cos.Infof("Warning: empty comm-type, defaulting to %q", Hpush)
m.CommTypeX = Hpush
}
return nil
}

func (m *InitCodeMsg) Validate() error {
if err := m.InitMsgBase.validate(m.String()); err != nil {
return err
}

if len(m.Code) == 0 {
return fmt.Errorf("source code is empty (%q)", m.Runtime)
}
Expand All @@ -174,12 +212,7 @@ func (m *InitCodeMsg) Validate() error {
if _, ok := runtime.Get(m.Runtime); !ok {
return fmt.Errorf("unsupported runtime %q (supported: %v)", m.Runtime, runtime.GetNames())
}
if m.CommTypeX == "" {
cos.Infof("Warning: empty comm-type, defaulting to %q (%q)", Hpush, m.Runtime)
m.CommTypeX = Hpush
} else if !cos.StringInSlice(m.CommTypeX, commTypes) {
return fmt.Errorf("unsupported comm-type %q (%q)", m.CommTypeX, m.Runtime)
}

if m.Funcs.Transform == "" {
return fmt.Errorf("transform function cannot be empty (comm-type %q, funcs %+v)", m.CommTypeX, m.Funcs)
}
Expand All @@ -190,37 +223,18 @@ func (m *InitCodeMsg) Validate() error {
return nil
}

func ParsePodSpec(errCtx *cmn.ETLErrCtx, spec []byte) (*corev1.Pod, error) {
obj, _, err := scheme.Codecs.UniversalDeserializer().Decode(spec, nil, nil)
if err != nil {
return nil, cmn.NewErrETL(errCtx, "failed to parse pod spec: %v\n%q", err, string(spec))
}
pod, ok := obj.(*corev1.Pod)
if !ok {
kind := obj.GetObjectKind().GroupVersionKind().Kind
return nil, cmn.NewErrETL(errCtx, "expected pod spec, got: %s", kind)
}
return pod, nil
}

func (m *InitSpecMsg) Validate() (err error) {
if err := k8s.ValidateEtlName(m.IDX); err != nil {
if err := m.InitMsgBase.validate(m.String()); err != nil {
return err
}

errCtx := &cmn.ETLErrCtx{ETLName: m.Name()}

// Check pod specification constraints.
pod, err := ParsePodSpec(errCtx, m.Spec)
if err != nil {
return err
}

if err := validateCommType(m.CommType()); err != nil {
return cmn.NewErrETL(errCtx, err.Error())
}
if m.CommType() == "" {
m.CommTypeX = Hpush
}

// Check pod specification constraints.
if len(pod.Spec.Containers) != 1 {
err = cmn.NewErrETL(errCtx, "unsupported number of containers (%d), expected: 1", len(pod.Spec.Containers))
return
Expand Down Expand Up @@ -254,6 +268,19 @@ func (m *InitSpecMsg) Validate() (err error) {
return nil
}

func ParsePodSpec(errCtx *cmn.ETLErrCtx, spec []byte) (*corev1.Pod, error) {
obj, _, err := scheme.Codecs.UniversalDeserializer().Decode(spec, nil, nil)
if err != nil {
return nil, cmn.NewErrETL(errCtx, "failed to parse pod spec: %v\n%q", err, string(spec))
}
pod, ok := obj.(*corev1.Pod)
if !ok {
kind := obj.GetObjectKind().GroupVersionKind().Kind
return nil, cmn.NewErrETL(errCtx, "expected pod spec, got: %s", kind)
}
return pod, nil
}

//////////////
// InfoList //
//////////////
Expand Down
22 changes: 11 additions & 11 deletions ext/etl/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,19 @@ var _ = Describe("CommunicatorTest", func() {
pod.SetName("somename")

xctn := mock.NewXact(apc.ActETLInline)
comm = makeCommunicator(commArgs{
boot: &etlBootstrapper{
t: tMock,
msg: InitSpecMsg{
InitMsgBase: InitMsgBase{
CommTypeX: commType,
},
boot := &etlBootstrapper{
t: tMock,
msg: InitSpecMsg{
InitMsgBase: InitMsgBase{
CommTypeX: commType,
},
pod: pod,
uri: transformerServer.URL,
xctn: xctn,
},
})
pod: pod,
uri: transformerServer.URL,
xctn: xctn,
}
comm = newCommunicator(nil, boot)

resp, err := http.Get(proxyServer.URL)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
Expand Down
Loading

0 comments on commit 2390f7f

Please sign in to comment.