Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
fix

fix ci

fix ci

fix ci
  • Loading branch information
xxhZs committed Apr 8, 2024
1 parent 1d45ad0 commit 3926983
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 28 deletions.
7 changes: 4 additions & 3 deletions ci/scripts/e2e-redis-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/redis_cluster_sink.slt'

redis-cli -c --cluster call 127.0.0.1:7000 keys \* >> ./query_result_1.txt

if cat ./query_result_1.txt | tr '\n' '\0' | xargs -0 -n1 bash -c '[[ "$0" == "{\"v1\":1}" || "$0" == "{\"v2\":2}" || "$0" == "{\"v3\":3}" ]]'; then
line_count=$(wc -l < query_result_1.txt)
if [ "$line_count" -eq 4 ]; then
echo "Redis sink check passed"
else
cat ./query_result_1.txt
echo "The output is not as expected."
exit 1
echo "The output is not as expected."
exit 1
fi

echo "--- Kill cluster"
Expand Down
51 changes: 27 additions & 24 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ pub struct RedisCommon {
pub url: String,
}
pub enum RedisConn {
// Redis deployed as a cluster, clusters with only one node should also use this conn
Cluster(ClusterConnection),
// Redis is not deployed as a cluster
Single(MultiplexedConnection),
}

Expand Down Expand Up @@ -87,34 +89,35 @@ impl ConnectionLike for RedisConn {

impl RedisCommon {
pub async fn build_conn(&self) -> ConnectorResult<RedisConn> {
let url: Value =
serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e)))?;
match url {
Value::String(s) => {
let client = RedisClient::open(s)?;
match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) {
Ok(v) => {
if let Value::Array(list) = v {
let list = list
.into_iter()
.map(|s| {
if let Value::String(s) = s {
Ok(s)
} else {
Err(SinkError::Redis(
"redis.url must be array of string".to_string(),
)
.into())
}
})
.collect::<ConnectorResult<Vec<String>>>()?;

let client = ClusterClient::new(list)?;
Ok(RedisConn::Cluster(client.get_async_connection().await?))
} else {
Err(SinkError::Redis("redis.url must be array or string".to_string()).into())
}
}
Err(_) => {
let client = RedisClient::open(self.url.clone())?;
Ok(RedisConn::Single(
client.get_multiplexed_async_connection().await?,
))
}
Value::Array(list) => {
let list = list
.into_iter()
.map(|s| {
if let Value::String(s) = s {
Ok(s)
} else {
Err(
SinkError::Redis("redis.url must be array of string".to_string())
.into(),
)
}
})
.collect::<ConnectorResult<Vec<String>>>()?;

let client = ClusterClient::new(list)?;
Ok(RedisConn::Cluster(client.get_async_connection().await?))
}
_ => Err(SinkError::Redis("redis.url must be array or string".to_string()).into()),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ PulsarConfig:
RedisConfig:
fields:
- name: redis.url
field_type: Vec<String>
field_type: String
required: true
StarrocksConfig:
fields:
Expand Down

0 comments on commit 3926983

Please sign in to comment.