Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
vbarbaresi committed Jan 16, 2021
2 parents a434042 + 7b7ff90 commit 80effb5
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 70 deletions.
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,14 @@ Some more companies are using Luigi but haven't had a chance yet to write about
* `Hopper <https://www.hopper.com/>`_
* `VOYAGE GROUP/Zucks <https://zucks.co.jp/en/>`_
* `Textpert <https://www.textpert.ai/>`_
* `Tracktics <https://www.tracktics.com/>`_
* `Whizar <https://www.whizar.com/>`_
* `xtream <https://www.xtreamers.io/>`__
* `Skyscanner <https://www.skyscanner.net/>`_
* `Jodel <https://www.jodel.com/>`_
* `Mekar <https://mekar.id/en/>`_
* `M3 <https://corporate.m3.com/en/>`_
* `Assist Digital <https://www.assistdigital.com/>`_

We're more than happy to have your company added here. Just send a PR on GitHub.

Expand Down
14 changes: 7 additions & 7 deletions doc/central_scheduler.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,20 @@ The task history has the following pages:

.. figure:: history.png
:alt: Recent history screenshot
* ``/history/by_id/:id``
* ``/history/by_id/{id}``
detailed information about a run, including:
parameter values, the host on which it ran, and timing information.
Example screenshot:

.. figure:: history_by_id.png
:alt: By id screenshot
* ``/history/by_name/:name``
a listing of all runs of a task with the given task name.
* ``/history/by_name/{name}``
a listing of all runs of a task with the given task ``{name}``.
Example screenshot:

.. figure:: history_by_name.png
:alt: By name screenshot
* ``/history/by_params/:name?data=params``
a listing of all runs of a given task restricted to runs with param values matching the given data.
The data is a json blob describing the parameters,
e.g. ``{"foo": "bar"}`` looks for a task with ``foo=bar``.
* ``/history/by_params/{name}?data=params``
a listing of all runs of the task ``{name}`` restricted to runs with ``params`` matching the given history.
The ``params`` is a json blob describing the parameters,
e.g. ``data={"foo": "bar"}`` looks for a task with ``foo=bar``.
1 change: 1 addition & 0 deletions luigi/contrib/external_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def _track_url_by_pattern():
self.build_tracking_url(match.group(1))
)
else:
file_to_write.flush()
sleep(time_to_sleep)

track_proc = Process(target=_track_url_by_pattern)
Expand Down
76 changes: 30 additions & 46 deletions luigi/contrib/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,21 @@
from urllib.parse import urlsplit
from io import BytesIO

from tenacity import retry
from tenacity import retry_if_exception
from tenacity import retry_if_exception_type
from tenacity import wait_exponential
from tenacity import stop_after_attempt
from tenacity import after_log
from luigi.contrib import gcp
import luigi.target
from luigi.format import FileWrapper

logger = logging.getLogger('luigi-interface')

# Retry when following errors happened
RETRYABLE_ERRORS = None

try:
import httplib2

Expand All @@ -42,12 +51,8 @@
logger.warning("Loading GCS module without the python packages googleapiclient & google-auth. \
This will crash at runtime if GCS functionality is used.")
else:
# Retry transport and file IO errors.
RETRYABLE_ERRORS = (httplib2.HttpLib2Error, IOError)

# Number of times to retry failed downloads.
NUM_RETRIES = 5

# Number of bytes to send/receive in each request.
CHUNKSIZE = 10 * 1024 * 1024

Expand All @@ -64,6 +69,18 @@
GCS_BATCH_URI = 'https://storage.googleapis.com/batch/storage/v1'


# Retry configurations. For more details, see https://tenacity.readthedocs.io/en/latest/
def is_error_5xx(err):
return isinstance(err, errors.HttpError) and err.resp.status >= 500


gcs_retry = retry(retry=(retry_if_exception(is_error_5xx) | retry_if_exception_type(RETRYABLE_ERRORS)),
wait=wait_exponential(multiplier=1, min=1, max=10),
stop=stop_after_attempt(5),
reraise=True,
after=after_log(logger, logging.WARNING))


def _wait_for_consistency(checker):
"""Eventual consistency: wait until GCS reports something is true.
Expand Down Expand Up @@ -133,6 +150,7 @@ def _is_root(self, key):
def _add_path_delimiter(self, key):
return key if key[-1:] == '/' else key + '/'

@gcs_retry
def _obj_exists(self, bucket, obj):
try:
self.client.objects().get(bucket=bucket, object=obj).execute()
Expand All @@ -157,6 +175,7 @@ def _list_iter(self, bucket, prefix):

response = request.execute()

@gcs_retry
def _do_put(self, media, dest_path):
bucket, obj = self._path_to_bucket_and_key(dest_path)

Expand All @@ -165,28 +184,10 @@ def _do_put(self, media, dest_path):
return request.execute()

response = None
attempts = 0
while response is None:
error = None
try:
status, response = request.next_chunk()
if status:
logger.debug('Upload progress: %.2f%%', 100 * status.progress())
except errors.HttpError as err:
error = err
if err.resp.status < 500:
raise
logger.warning('Caught error while uploading', exc_info=True)
except RETRYABLE_ERRORS as err:
logger.warning('Caught error while uploading', exc_info=True)
error = err

if error:
attempts += 1
if attempts >= NUM_RETRIES:
raise error
else:
attempts = 0
status, response = request.next_chunk()
if status:
logger.debug('Upload progress: %.2f%%', 100 * status.progress())

_wait_for_consistency(lambda: self._obj_exists(bucket, obj))
return response
Expand Down Expand Up @@ -380,6 +381,7 @@ def list_wildcard(self, wildcard_path):
len(it) >= len(path + '/' + wildcard_parts[0]) + len(wildcard_parts[1]):
yield it

@gcs_retry
def download(self, path, chunksize=None, chunk_callback=lambda _: False):
"""Downloads the object contents to local file system.
Expand All @@ -400,29 +402,11 @@ def download(self, path, chunksize=None, chunk_callback=lambda _: False):
request = self.client.objects().get_media(bucket=bucket, object=obj)
downloader = http.MediaIoBaseDownload(fp, request, chunksize=chunksize)

attempts = 0
done = False
while not done:
error = None
try:
_, done = downloader.next_chunk()
if chunk_callback(fp):
done = True
except errors.HttpError as err:
error = err
if err.resp.status < 500:
raise
logger.warning('Error downloading file, retrying', exc_info=True)
except RETRYABLE_ERRORS as err:
logger.warning('Error downloading file, retrying', exc_info=True)
error = err

if error:
attempts += 1
if attempts >= NUM_RETRIES:
raise error
else:
attempts = 0
_, done = downloader.next_chunk()
if chunk_callback(fp):
done = True

return return_fp

Expand Down
30 changes: 15 additions & 15 deletions luigi/tools/deps_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@
$ luigi-deps-tree --module foo_complex examples.Foo
...
└─--[Foo-{} (PENDING)]
|--[Bar-{'num': '0'} (PENDING)]
| |--[Bar-{'num': '4'} (PENDING)]
| └─--[Bar-{'num': '5'} (PENDING)]
|--[Bar-{'num': '1'} (PENDING)]
└─--[Bar-{'num': '2'} (PENDING)]
└─--[Bar-{'num': '6'} (PENDING)]
|--[Bar-{'num': '7'} (PENDING)]
| |--[Bar-{'num': '9'} (PENDING)]
| └─--[Bar-{'num': '10'} (PENDING)]
| └─--[Bar-{'num': '11'} (PENDING)]
└─--[Bar-{'num': '8'} (PENDING)]
└─--[Bar-{'num': '12'} (PENDING)]
|---[Bar-{'num': '0'} (PENDING)]
| |---[Bar-{'num': '4'} (PENDING)]
| └─--[Bar-{'num': '5'} (PENDING)]
|---[Bar-{'num': '1'} (PENDING)]
└─--[Bar-{'num': '2'} (PENDING)]
└─--[Bar-{'num': '6'} (PENDING)]
|---[Bar-{'num': '7'} (PENDING)]
| |---[Bar-{'num': '9'} (PENDING)]
| └─--[Bar-{'num': '10'} (PENDING)]
| └─--[Bar-{'num': '11'} (PENDING)]
└─--[Bar-{'num': '8'} (PENDING)]
└─--[Bar-{'num': '12'} (PENDING)]
"""

from luigi.task import flatten
Expand Down Expand Up @@ -52,10 +52,10 @@ def print_tree(task, indent='', last=True):
result = '\n' + indent
if(last):
result += '└─--'
indent += ' '
indent += ' '
else:
result += '|--'
indent += '| '
result += '|---'
indent += '| '
result += '[{0}-{1} ({2})]'.format(name, params, is_complete)
children = flatten(task.requires())
for index, child in enumerate(children):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_static_files(path):
with open('README.rst') as fobj:
long_description = "\n\n" + readme_note + "\n\n" + fobj.read()

install_requires = ['python-dateutil>=2.7.5,<3']
install_requires = ['python-dateutil>=2.7.5,<3', 'tenacity>=6.3.0,<7']

# Can't use python-daemon>=2.2.0 if on windows
# See https://pagure.io/python-daemon/issue/18
Expand Down
26 changes: 25 additions & 1 deletion test/contrib/gcs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import os
import tempfile
import unittest
from unittest import mock

from luigi.contrib import gcs
from target_test import FileSystemTargetTestMixin
Expand Down Expand Up @@ -143,7 +144,7 @@ def test_listdir(self):

def test_put_file(self):
with tempfile.NamedTemporaryFile() as fp:
lorem = 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt\n'
lorem = b'Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt\n'
# Larger file than chunk size, fails with incorrect progress set up
big = lorem * 41943
fp.write(big)
Expand Down Expand Up @@ -196,3 +197,26 @@ def test_close_twice(self):
assert src.closed
src.close()
assert src.closed


class RetryTest(unittest.TestCase):
def test_success_with_retryable_error(self):
m = mock.MagicMock(side_effect=[IOError, IOError, 'test_func_output'])

@gcs.gcs_retry
def mock_func():
return m()

actual = mock_func()
expected = 'test_func_output'
self.assertEqual(expected, actual)

def test_fail_with_retry_limit_exceed(self):
m = mock.MagicMock(side_effect=[IOError, IOError, IOError, IOError, IOError])

@gcs.gcs_retry
def mock_func():
return m()

with self.assertRaises(IOError):
mock_func()

0 comments on commit 80effb5

Please sign in to comment.