Skip to content

Latest commit

 

History

History
160 lines (146 loc) · 3.73 KB

gettingstarted.adoc

File metadata and controls

160 lines (146 loc) · 3.73 KB

Getting started

Use cases

REST API

Specification DSL

---
type: "Specification"
name: ""
startFunctionId: "loop"
configure:
- type: "SpecificationContext"
  variables:
    nextPosition: "${contentStream.lastOrInitialPosition(1)}"
  globalState:
    global.topic: "topic"
functions:
  loop:
    type: "Paginate"
    id: "loop"
    variables:
      fromPosition: "${nextPosition}"
    threshold: 15
    addPageContent: true
    iterate:
    - type: "Execute"
      executeId: "page"
    until:
      type: "ConditionWhenVariableIsNull"
      identifier: "nextPosition"
  page:
    type: "Get"
    id: "page"
    url: "http://example.com/feed?pos=${fromPosition}&size=10"
    returnVariables:
    - "nextPosition"
    responseValidators:
    - type: "HttpStatusValidation"
      success:
      - 200
    pipes:
    - type: "Sequence"
      splitQuery:
        type: "QueryXPath"
        expression: "/feed/entry"
      expectedQuery:
        type: "QueryXPath"
        expression: "/entry/id"
    - type: "NextPage"
      outputs:
        nextPosition:
          type: "QueryRegEx"
          expression: "(?<=[?&]pos=)[^&]*"
          query:
            type: "QueryXPath"
            expression: "/feed/link[@rel=\"next\"]/@href"
    - type: "Parallel"
      variables:
        position:
          type: "QueryXPath"
          expression: "/entry/id"
      splitQuery:
        type: "QueryXPath"
        expression: "/feed/entry"
      pipes:
      - type: "AddContent"
        positionVariableExpression: "${position}"
        contentKey: "entry"
      - type: "Publish"
        positionVariableExpression: "${position}"

Execute specification

PUT

/task

Request Body

SPECIFICATION_PAYLOAD

Response codes

201

Job started

400

Bad request

404

Resource not found

Library (emed in your own application)

Specification DSL

SpecificationBuilder feedBuilder = Specification.start("", "loop")
        .configure(context()
                .topic("topic")
                .variable("nextPosition", "${contentStream.lastOrInitialPosition(1)}")
        )
        .function(paginate("loop")
                .variable("fromPosition", "${nextPosition}")
                .addPageContent()
                .iterate(execute("page"))
                .prefetchThreshold(15)
                .until(whenVariableIsNull("nextPosition"))
        )
        .function(get("page")
                .url("http://example.com/feed?pos=${fromPosition}&size=10")
                .validate(status().success(200))
                .pipe(sequence(xpath("/feed/entry"))
                        .expected(xpath("/entry/id"))
                )
                .pipe(nextPage()
                        .output("nextPosition", regex(xpath("/feed/link[@rel=\"next\"]/@href"), "(?<=[?&]pos=)[^&]*"))
                )
                .pipe(parallel(xpath("/feed/entry"))
                        .variable("position", xpath("/entry/id"))
                        .pipe(addContent("${position}", "entry"))
                        .pipe(publish("${position}"))
                )
                .returnVariables("nextPosition")
        );

Execute specification

Worker.newBuilder()
        .configuration(Map.of(
                "content.stream.connector", "rawdata",
                "rawdata.client.provider", "memory")
        )
        .specification(feedBuilder)
        .printExecutionPlan()
        .build()
        .run();