Skip to content

Commit

Permalink
refactor(services/webhdfs): Rewrite webhdfs methods signature by us…
Browse files Browse the repository at this point in the history
…ing `OpXxxx` (#3109)

* refactor(service/webHDFS): Passing OpWrite instead of content_type

* refactor(service/webHDFS): Passing OpList instead of start_after

refactor(service/webHDFS): Passing OpList instead of start_after

* use existing API of raw::ops

* format the code snippets of modification
  • Loading branch information
cxorm authored Sep 18, 2023
1 parent 09c19c0 commit 9b4b762
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 8 deletions.
12 changes: 6 additions & 6 deletions core/src/services/webhdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl WebhdfsBackend {
&self,
path: &str,
size: Option<usize>,
content_type: Option<&str>,
args: &OpWrite,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
Expand Down Expand Up @@ -230,7 +230,7 @@ impl WebhdfsBackend {
if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size.to_string());
}
if let Some(content_type) = content_type {
if let Some(content_type) = args.content_type() {
req = req.header(CONTENT_TYPE, content_type);
}

Expand Down Expand Up @@ -296,12 +296,12 @@ impl WebhdfsBackend {
pub(super) fn webhdfs_list_status_batch_request(
&self,
path: &str,
start_after: &Option<String>,
args: &OpList,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);

// if it's not the first time to call LISTSTATUS_BATCH, we will add &startAfter=<CHILD>
let start_after_param = match start_after {
let start_after_param = match args.start_after() {
Some(sa) if sa.is_empty() => String::new(),
Some(sa) => format!("&startAfter={}", sa),
None => String::new(),
Expand Down Expand Up @@ -430,7 +430,7 @@ impl Accessor for WebhdfsBackend {
/// Create a file or directory
async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
let req = self
.webhdfs_create_object_request(path, Some(0), None, AsyncBody::Empty)
.webhdfs_create_object_request(path, Some(0), &OpWrite::default(), AsyncBody::Empty)
.await?;

let resp = self.client.send(req).await?;
Expand Down Expand Up @@ -535,7 +535,7 @@ impl Accessor for WebhdfsBackend {
let path = path.trim_end_matches('/');

if !self.disable_list_batch {
let req = self.webhdfs_list_status_batch_request(path, &None)?;
let req = self.webhdfs_list_status_batch_request(path, &OpList::default())?;
let resp = self.client.send(req).await?;
match resp.status() {
StatusCode::OK => {
Expand Down
6 changes: 5 additions & 1 deletion core/src/services/webhdfs/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ impl oio::Page for WebhdfsPager {
return match self.backend.disable_list_batch {
true => self.webhdfs_get_next_list_statuses(),
false => {
let args = OpList::with_start_after(
OpList::default(),
&self.batch_start_after.clone().unwrap(),
);
let req = self
.backend
.webhdfs_list_status_batch_request(&self.path, &self.batch_start_after)?;
.webhdfs_list_status_batch_request(&self.path, &args)?;
let resp = self.backend.client.send(req).await?;

match resp.status() {
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/webhdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl oio::OneShotWrite for WebhdfsWriter {
.webhdfs_create_object_request(
&self.path,
Some(bs.len()),
self.op.content_type(),
&self.op,
AsyncBody::Bytes(bs),
)
.await?;
Expand Down

0 comments on commit 9b4b762

Please sign in to comment.