Skip to content

Commit

Permalink
Merge branch 'master' into INLONG-11005
Browse files Browse the repository at this point in the history
# Conflicts:
#	inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
  • Loading branch information
emptyOVO committed Sep 19, 2024
2 parents 3d5818f + c4cbc10 commit 0d3464a
Show file tree
Hide file tree
Showing 65 changed files with 985 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.util.List;

@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = SourceType.FILE)
@ApiModel(value = "File data add task request")
public class FileDataAddTaskRequest extends DataAddTaskRequest {

@ApiModelProperty("filterStreams")
private List<String> filterStreams;

@ApiModelProperty("Start time")
private Long startTime;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public class FileSource extends StreamSource {
@ApiModelProperty("End time")
private Long endTime;

@ApiModelProperty("filterStreams")
private List<String> filterStreams;

public FileSource() {
this.setSourceType(SourceType.FILE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public class FileSourceDTO {
@ApiModelProperty(value = "Audit version")
private String auditVersion;

@ApiModelProperty("filterStreams")
private List<String> filterStreams;

public static FileSourceDTO getFromRequest(@NotNull FileSourceRequest fileSourceRequest, String extParams) {
FileSourceDTO dto = StringUtils.isNotBlank(extParams)
? FileSourceDTO.getFromJson(extParams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public class FileSourceRequest extends SourceRequest {
@ApiModelProperty("End time")
private Long endTime;

@ApiModelProperty("filterStreams")
private List<String> filterStreams;

public FileSourceRequest() {
this.setSourceType(SourceType.FILE);
this.setSerializationType(DataFormat.CSV.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,6 @@ default Boolean updateAfterApprove(String operator) {
* @param operator Operator's name.
* @return source id list after saving.
*/
List<Integer> batchAddDataAddTask(String groupId, String streamId, List<DataAddTaskRequest> requestList,
List<Integer> batchAddDataAddTask(String groupId, List<DataAddTaskRequest> requestList,
String operator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,10 @@ public Integer addDataAddTask(DataAddTaskRequest request, String operator) {
}

@Override
public List<Integer> batchAddDataAddTask(String groupId, String streamId, List<DataAddTaskRequest> requestList,
public List<Integer> batchAddDataAddTask(String groupId, List<DataAddTaskRequest> requestList,
String operator) {
List<Integer> result = new ArrayList<>();
String auditVersion = String.valueOf(sourceMapper.selectDataAddTaskCount(groupId, streamId));
String auditVersion = String.valueOf(sourceMapper.selectDataAddTaskCount(groupId, null));
for (DataAddTaskRequest request : requestList) {
request.setAuditVersion(auditVersion);
int id = addDataAddTask(request, operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,19 @@ public Integer addDataAddTask(DataAddTaskRequest request, String operator) {
dto.setStartTime(sourceRequest.getStartTime());
dto.setEndTime(sourceRequest.getEndTime());
dto.setRetry(true);
dto.setAuditVersion(request.getAuditVersion());
dto.setFilterStreams(sourceRequest.getFilterStreams());
StreamSourceEntity dataAddTaskEntity =
CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new);
dataAddTaskEntity.setId(null);
dataAddTaskEntity.setSourceName(sourceEntity.getSourceName() + "-" + (dataAddTaskSize + 1));
dataAddTaskEntity.setExtParams(objectMapper.writeValueAsString(dto));
dataAddTaskEntity.setTaskMapId(sourceEntity.getId());
return sourceMapper.insert(dataAddTaskEntity);
Integer id = sourceMapper.insert(dataAddTaskEntity);
SourceRequest dataAddTaskRequest =
CommonBeanUtils.copyProperties(dataAddTaskEntity, SourceRequest::new, true);
updateAgentTaskConfig(dataAddTaskRequest, operator);
return id;
} catch (Exception e) {
LOGGER.error("serialize extParams of File SourceDTO failure: ", e);
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,13 @@ public Response<Boolean> forceDelete(@RequestParam String inlongGroupId, @Reques
sourceService.forceDelete(inlongGroupId, inlongStreamId, LoginUserUtils.getLoginUser().getName()));
}

@RequestMapping(value = "/source/addDataAddTask/{groupId}/{streamId}", method = RequestMethod.POST)
@RequestMapping(value = "/source/addDataAddTask/{groupId}", method = RequestMethod.POST)
@ApiOperation(value = "Add supplementary recording task for stream source")
@ApiImplicitParams({
@ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true),
@ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true)
})
public Response<List<Integer>> addSub(@PathVariable String groupId, @PathVariable String streamId,
@ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true)
public Response<List<Integer>> addSub(@PathVariable String groupId,
@RequestBody List<DataAddTaskRequest> requestList) {
return Response.success(sourceService.batchAddDataAddTask(groupId, streamId, requestList,
LoginUserUtils.getLoginUser().getName()));
return Response.success(
sourceService.batchAddDataAddTask(groupId, requestList, LoginUserUtils.getLoginUser().getName()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,31 @@
public class HttpClientExample {

public static void main(String[] args) {
/*
* 1. if 'isLocalVisit' is true use local config from file in ${configBasePath} directory/${dataProxyGroupId}
* .local such as : configBasePath = /data/inlong/dataproxy/conf dataProxyGroupId = test so config file is :
* /data/inlong/dataproxy/conf/test.local and config context like this:
* {"isInterVisit":1,"clusterId":"1","size":1,"switch":1,"address":[{"host":"127.0.0
* .1","port":"46802"},{"host":"127.0.0.1","port":"46802"}]}
*
* 2. if 'isLocalVisit' is false sdk will get config from manager auto.
*/
String inlongGroupId = "test_group_id";
String inlongStreamId = "test_stream_id";
String configBasePath = "/data/inlong/dataproxy/conf";
String configBasePath = "";
String inLongManagerAddr = "127.0.0.1";
String inLongManagerPort = "8080";
String inLongManagerPort = "8083";
String localIP = "127.0.0.1";
String messageBody = "inlong message body!";

HttpProxySender sender = getMessageSender(localIP, inLongManagerAddr,
inLongManagerPort, inlongGroupId, false, false,
inLongManagerPort, inlongGroupId, true, false,
configBasePath);

sendHttpMessage(sender, inlongGroupId, inlongStreamId, messageBody);
sender.close(); // close the sender
}

public static HttpProxySender getMessageSender(String localIP, String inLongManagerAddr,
String inLongManagerPort, String inlongGroupId,
boolean isLocalVisit, boolean isReadProxyIPFromLocal,
boolean requestByHttp, boolean isReadProxyIPFromLocal,
String configBasePath) {
ProxyClientConfig proxyConfig = null;
HttpProxySender sender = null;
try {
proxyConfig = new ProxyClientConfig(localIP, isLocalVisit, inLongManagerAddr,
proxyConfig = new ProxyClientConfig(localIP, requestByHttp, inLongManagerAddr,
Integer.valueOf(inLongManagerPort),
inlongGroupId, "test", "123456");
inlongGroupId, "admin", "inlong");// user and password of manager
proxyConfig.setInlongGroupId(inlongGroupId);
proxyConfig.setConfStoreBasePath(configBasePath);
proxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,12 @@ public class TcpClientExample {
*/
public static void main(String[] args) throws InterruptedException {

String inlongGroupId = "test_test";
String inlongStreamId = "test_test";

/*
* 1. if isLocalVisit is true, will get dataproxy server info from local file in
* ${configBasePath}/${inlongGroupId}.local file
*
* for example: /data/inlong/config/test_test.local and file context like this:
* {"isInterVisit":1,"clusterId":"1","size":1,"switch":1,"address":[{"host":"127.0.0.1",
* "port":"46802"},{"host":"127.0.0.1","port":"46802"}]} 2. if isLocalVisit is false, will get dataproxy server
* info from manager so we must ensure that the manager server url is configured correctly!
*/
String configBasePath = "/data/inlong/config";
String inlongGroupId = "test_group_id";
String inlongStreamId = "test_stream_id";

String configBasePath = "";
String inLongManagerAddr = "127.0.0.1";
String inLongManagerPort = "8000";
String inLongManagerPort = "8083";

/*
* It is recommended to use type 7. For others, please refer to the official related documents
Expand All @@ -66,19 +56,20 @@ public static void main(String[] args) throws InterruptedException {
TcpClientExample tcpClientExample = new TcpClientExample();
DefaultMessageSender sender = tcpClientExample
.getMessageSender(localIP, inLongManagerAddr, inLongManagerPort,
inlongGroupId, false, false, configBasePath, msgType);
inlongGroupId, true, false, configBasePath, msgType);
tcpClientExample.sendTcpMessage(sender, inlongGroupId, inlongStreamId,
messageBody, System.currentTimeMillis());
sender.close(); // close the sender
}

public DefaultMessageSender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort,
String inlongGroupId, boolean isLocalVisit, boolean isReadProxyIPFromLocal,
String inlongGroupId, boolean requestByHttp, boolean isReadProxyIPFromLocal,
String configBasePath, int msgType) {
ProxyClientConfig dataProxyConfig = null;
DefaultMessageSender messageSender = null;
try {
dataProxyConfig = new ProxyClientConfig(localIP, isLocalVisit, inLongManagerAddr,
Integer.valueOf(inLongManagerPort), inlongGroupId, "test", "123456");
dataProxyConfig = new ProxyClientConfig(localIP, requestByHttp, inLongManagerAddr,
Integer.valueOf(inLongManagerPort), inlongGroupId, "admin", "inlong");
if (StringUtils.isNotEmpty(configBasePath)) {
dataProxyConfig.setConfStoreBasePath(configBasePath);
}
Expand All @@ -98,9 +89,11 @@ public void sendTcpMessage(DefaultMessageSender sender, String inlongGroupId,
try {
result = sender.sendMessage(messageBody.getBytes("utf8"), inlongGroupId, inlongStreamId,
0, String.valueOf(dt), 20, TimeUnit.SECONDS);

} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("messageSender" + result);
logger.info("messageSender {}", result);
}

Expand Down
4 changes: 4 additions & 0 deletions inlong-sdk/transform-sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;

/**
* BsonSourceData
*/
public class BsonSourceData extends JsonSourceData {

public BsonSourceData(JsonObject root, JsonArray childRoot) {
super(root, childRoot);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import org.apache.inlong.sdk.transform.pojo.BsonSourceInfo;
import org.apache.inlong.sdk.transform.process.Context;

import lombok.extern.slf4j.Slf4j;
import org.bson.BsonDocument;
import org.bson.RawBsonDocument;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;

import java.math.BigDecimal;

/**
* BsonSourceDecoder
*/
@Slf4j
public class BsonSourceDecoder implements SourceDecoder<byte[]> {

private final JsonSourceDecoder decoder;

public BsonSourceDecoder(BsonSourceInfo sourceInfo) {
decoder = new JsonSourceDecoder(sourceInfo);
}

@Override
public SourceData decode(byte[] srcBytes, Context context) {
return decoder.decode(parse(srcBytes), context);
}

public String parse(byte[] bsonData) {
try {
RawBsonDocument rawBsonDocument = new RawBsonDocument(bsonData);
JsonWriterSettings writerSettings = JsonWriterSettings.builder()
.outputMode(JsonMode.RELAXED)
.decimal128Converter((value, writer) -> writer.writeNumber(value.bigDecimalValue().toPlainString()))
.doubleConverter((value, writer) -> writer.writeNumber(new BigDecimal(value).toString()))
.build();
BsonDocument bsonDocument = BsonDocument.parse(rawBsonDocument.toJson(writerSettings));
bsonDocument.remove("_id");
return bsonDocument.toJson();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.decode;

import org.apache.inlong.sdk.transform.pojo.AvroSourceInfo;
import org.apache.inlong.sdk.transform.pojo.BsonSourceInfo;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
Expand Down Expand Up @@ -45,6 +46,9 @@ public static PbSourceDecoder createPbDecoder(PbSourceInfo sourceInfo) {
public static AvroSourceDecoder createAvroDecoder(AvroSourceInfo sourceInfo) {
return new AvroSourceDecoder(sourceInfo);
}
public static BsonSourceDecoder createBsonDecoder(BsonSourceInfo sourceInfo) {
return new BsonSourceDecoder(sourceInfo);
}

public static YamlSourceDecoder createYamlDecoder(YamlSourceInfo sourceInfo) {
return new YamlSourceDecoder(sourceInfo);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.pojo;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* BsonSourceInfo
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class BsonSourceInfo extends JsonSourceInfo {

@JsonCreator
public BsonSourceInfo(
@JsonProperty("charset") String charset,
@JsonProperty("rowsNodePath") String rowsNodePath) {
super(charset, rowsNodePath);
}
}
Loading

0 comments on commit 0d3464a

Please sign in to comment.