Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

advanced-download-and-upload-abort-restart Question #151

Open
danhamill opened this issue Apr 11, 2022 · 8 comments
Open

advanced-download-and-upload-abort-restart Question #151

danhamill opened this issue Apr 11, 2022 · 8 comments

Comments

@danhamill
Copy link

I am trying to implement a the advanced-download-and-upload-abort-restart example for my use and seem to have come across an infinte loop when my first file completes the download:

import asyncio
import aioftp
import os
import pathlib
import aiofiles
import logging
import pathlib
import aiofiles.os as aos
import async_timeout



async def list_files(model, var, dest_dir):

    pr_paths = [pathlib.PurePosixPath('/pub/dcp/archive/cmip5/loca/LOCA_2016-04-02/ACCESS1-0/16th/historical/r1i1p1/pr/pr_day_ACCESS1-0_historical_r1i1p1_19500101-19501231.LOCA_2016-04-02.16th.nc'), 
                pathlib.PurePosixPath('/pub/dcp/archive/cmip5/loca/LOCA_2016-04-02/ACCESS1-0/16th/historical/r1i1p1/pr/pr_day_ACCESS1-0_historical_r1i1p1_19510101-19511231.LOCA_2016-04-02.16th.nc')]

    for path in pr_paths:
        dest = dest_dir.joinpath(path.name)
        print(f'File to download: {path} \n')
        print(f'Should download to {dest} \n')

        while True:
            try:

                async with aioftp.Client.context('192.12.137.7', user='anonymous', port=21) as client:
                    async with aiofiles.open(dest, mode='ab', ) as local_file:
                        
                        #Check to see if local_file exists
                        if await aos.path.exists(dest):
                            stat = await aos.stat(dest)
                            size = stat.st_size
                        else:
                            size = 0
                        logging.info(f'Starting at postition {size}')
                        local_file.seek(size)
                        
                        async with client.download_stream(path, offset=size) as stream:
                            while True:
                                logging.info(f'Stream Connected')
                                async for block in stream.iter_by_block(512):
                                    if not block:
                                        break
                                    await local_file.write(block)
                                await stream.finish()
                break
            except ConnectionResetError:
                pass

Logging Info

PS C:\workspace\git_clones\loca_download> & C:/Anaconda3/envs/loca_download/python.exe c:/workspace/git_clones/loca_download/scripts/connect_loca_git.py
[09:40:05]: [asyncio] Using proactor: IocpProactor
File to download: /pub/dcp/archive/cmip5/loca/LOCA_2016-04-02/ACCESS1-0/16th/historical/r1i1p1/pr/pr_day_ACCESS1-0_historical_r1i1p1_19500101-19501231.LOCA_2016-04-02.16th.nc 

Should download to output\ACCESS1-0\historical\pr\pr_day_ACCESS1-0_historical_r1i1p1_19500101-19501231.LOCA_2016-04-02.16th.nc

[09:40:05]: [aioftp.client] 220-        **WARNING**WARNING**WARNING**WARNING**WARNING**
[09:40:05]: [aioftp.client] 220- This is a Department of Energy (DOE) computer system. DOE
[09:40:05]: [aioftp.client] 220- computer systems are provided for the processing of official
[09:40:05]: [aioftp.client] 220- U.S. Government information only. All data contained within
[09:40:05]: [aioftp.client] 220- DOE computer systems is owned by the DOE, and may be audited,
[09:40:05]: [aioftp.client] 220- intercepted, recorded, read, copied, or captured in any
[09:40:05]: [aioftp.client] 220- manner and disclosed in any manner, by authorized personnel.
[09:40:05]: [aioftp.client] 220- THERE IS NO RIGHT OF PRIVACY IN THIS SYSTEM. System personnel
[09:40:05]: [aioftp.client] 220- may disclose any potential evidence of crime found on DOE
[09:40:05]: [aioftp.client] 220- computer systems to appropriate authorities.
[09:40:05]: [aioftp.client] 220-
[09:40:05]: [aioftp.client] 220- USE OF THIS SYSTEM BY ANY USER, AUTHORIZED OR UNAUTHORIZED,
[09:40:05]: [aioftp.client] 220- CONSTITUTES CONSENT TO THIS AUDITING, INTERCEPTION, RECORDING,
[09:40:05]: [aioftp.client] 220- READING, COPYING, CAPTURING, AND DISCLOSURE OF COMPUTER ACTIVITY.
[09:40:05]: [aioftp.client] 220-
[09:40:05]: [aioftp.client] 220-        **WARNING**WARNING**WARNING**WARNING**WARNING**
[09:40:05]: [aioftp.client] 220-
[09:40:05]: [aioftp.client] 220
[09:40:05]: [aioftp.client] USER anonymous
[09:40:06]: [aioftp.client] 331 Please specify the password.
[09:40:06]: [aioftp.client] PASS *****
[09:40:06]: [aioftp.client] 230 Login successful.
[09:40:06]: [root] Starting at postition 230992375
[09:40:06]: [aioftp.client] TYPE I
[09:40:06]: [aioftp.client] 200 Switching to Binary mode.
[09:40:06]: [aioftp.client] EPSV
[09:40:06]: [aioftp.client] 229 Entering Extended Passive Mode (|||35457|).
[09:40:06]: [aioftp.client] REST 230992375
[09:40:06]: [aioftp.client] 350 Restart position accepted (230992375).
[09:40:06]: [aioftp.client] RETR /pub/dcp/archive/cmip5/loca/LOCA_2016-04-02/ACCESS1-0/16th/historical/r1i1p1/pr/pr_day_ACCESS1-0_historical_r1i1p1_19500101-19501231.LOCA_2016-04-02.16th.nc
[09:40:06]: [aioftp.client] 150 Opening BINARY mode data connection for /pub/dcp/archive/cmip5/loca/LOCA_2016-04-02/ACCESS1-0/16th/historical/r1i1p1/pr/pr_day_ACCESS1-0_historical_r1i1p1_19500101-19501231.LOCA_2016-04-02.16th.nc (230992375 bytes).
[09:40:06]: [root] Stream Connected
[09:40:06]: [aioftp.client] 226 Transfer complete.
[09:40:06]: [root] Stream Connected

As you can see the code appears to be working properly, and I am getting a complete transfer. However the code does not seem to move on to the next file.

How can I write a conditional statement that checks for [aioftp.client] 226 Transfer complete?
Which bit of code is that message coming from?

@danhamill
Copy link
Author

I tried setting something up like:

remote_stat = await client.stat(path)
remote_size = int(remote_stat['size'])

if await aos.path.exists(dest):
    stat = await aos.stat(dest)
    size = stat.st_size
else:
    size = 0
while not remote_size == size:
    async with client.download_stream(path, offset=size) as stream:

But that will only work if the local file is exactly the size of the remote file. How can I check stream to see the current postiion to test within async with client.download_stream block?

@pohmelie
Copy link
Collaborator

In first post I see some problems:

async for block in stream.iter_by_block(512):
      if not block:
          break
      await local_file.write(block)

Unfortunately aioftp is a little chaotic project, so you wont get empty block on iteration end. Maybe this should be made as you expected. So your check and break call will never happend. Iteration internaly ends when empty block comes from socket, so you need just to iterate as much as you can.

@danhamill
Copy link
Author

danhamill commented Apr 12, 2022

Thanks for your reply. I had a long debugging session yesterday and can confirm

if not block:
    break

is never evaluated True. In case somebody else has a similar issue, I will post my working solution here

while max_attempts<30:
    
    try:
        logging.info(f'Starting Attempt {max_attempts}')
        async with async_timeout.timeout(20):
            async with aioftp.Client.context('192.12.137.7', user='anonymous', port=21) as client:
                
                if remote_size is None:
                    logging.info(f'Getting remote stats for file {path}')
                    remote_stat = await client.stat(path)
                    remote_size = int(remote_stat['size'])
                    logging.info(f' Remote file has size {remote_size}')
                
                async with aiofiles.open(dest, mode='ab', ) as local_file:
                    
                    #Check to see if local_file exists
                    if await aos.path.exists(dest):
                        stat = await aos.stat(dest)
                        size = stat.st_size
                    else:
                        size = 0
                    logging.info(f'Starting at postition {size}')
                    local_file.seek(size)

                    if remote_size == size:
                        break

                    async with client.download_stream(path, offset=size) as stream:
                        async for block in stream.iter_by_block():
                            await local_file.write(block)
                            
    except aioftp.StatusCodeError as ftp_e:

        max_attempts +=1
        logging.info(f'Found aioftp error, trying another attempt')
        if ftp_e.received_codes ==( '426',):
            logging.info(f'Forced timeout error, trying another attempt')

        if ftp_e.received_codes != ( '426',):
            logging.info('new code')
        asyncio.sleep(1)
        continue

    except asyncio.exceptions.TimeoutError as asy_e:
        logging.info(f'found time out exception')
        max_attempts +=1
        continue

Adding a time out of 20 seconds creates a lot of 426 failed to write block errors and only a few Timeout Errors, After a little tinkering with the maximum allowable attempts, this seems to be a reasonable workflow to ensure the complete files are downloaded.

@pohmelie
Copy link
Collaborator

This issue should be opened, since I don't understand the nature of failure is ot ftp client or server problelm.

@danhamill
Copy link
Author

All I can say is during my debugging when I got to a point where remote_size == size, as async for block in stream.iter_by_block() proceeded to await local_file.write(block) with out any variable named block. It did not return a block with a value of None

When remote_size > size I could see the bytes for each block .

@pohmelie
Copy link
Collaborator

@danhamill
I see that if remote_size == size break will be called. And I see no remote_size starting definition.

@danhamill
Copy link
Author

danhamill commented Apr 25, 2022

In my comment, remote_size is defined inside the top of the download routine. e.g.,

while max_attempts<30 ;
   try: 
       async with async_timeout.timeout(20):
            async with aioftp.Client.context('192.12.137.7', user='anonymous', port=21) as client: 
                if remote_size is None:
                    logging.info(f'Getting remote stats for file {path}')
                    remote_stat = await client.stat(path)
                    remote_size = int(remote_stat['size'])
                    logging.info(f' Remote file has size {remote_size}')

This loop sits inside an async def function in my main script. At first pass remote_size==None, and the file size is determined from server. This is skipped for subsequent attempts.

I have been extensively using this routine and have found some of the files I download locally are larger than what is provided on the server. Do you have any idea why this could happen?

Thanks!

@pohmelie
Copy link
Collaborator

Not enough info to make predictions. You should try to reproduce this on some big text file and then compare local «bigger» and remote «smaller» files with diff or something. Also, maybe add logs on failures and try to find out is there any correlation between attempts and different file sizes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants