Skip to content

Commit

Permalink
add file lock to avoid race condition
Browse files Browse the repository at this point in the history
Signed-off-by: lixiaoyu <[email protected]>
  • Loading branch information
lixiaoyu committed Mar 5, 2020
1 parent bebdcf4 commit d99cc66
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 2 deletions.
23 changes: 21 additions & 2 deletions prometheus_client/multiprocess.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import unicode_literals

from collections import defaultdict
from functools import wraps
import glob
import json
import os

from .process_lock import lock, unlock, LOCK_EX
from .metrics_core import Metric
from .mmap_dict import MmapedDict, mmap_key
from .samples import Sample
Expand All @@ -18,6 +20,21 @@
MP_METRIC_HELP = 'Multiprocess metric'


def require_metrics_lock(func):
@wraps(func)
def _wrap(*args, **kwargs):
path = os.environ.get('prometheus_multiproc_dir')
f = open(os.path.join(path, 'metrics.lock'), 'w')
try:
lock(f, LOCK_EX)
return func(*args, **kwargs)
finally:
unlock(f)
f.close()

return _wrap


class MultiProcessCollector(object):
"""Collector for files for multi-process mode."""

Expand All @@ -31,6 +48,7 @@ def __init__(self, registry, path=None):
registry.register(self)

@staticmethod
@require_metrics_lock
def merge(files, accumulate=True):
"""Merge metrics from given mmap files.
Expand Down Expand Up @@ -149,6 +167,7 @@ def collect(self):
return self.merge(files, accumulate=True)


@require_metrics_lock
def mark_process_dead(pid, path=None):
"""Do bookkeeping for when one process dies in a multi-process setup."""
if path is None:
Expand All @@ -175,8 +194,8 @@ def mark_process_dead(pid, path=None):
if not os.path.exists(merge_file):
MmapedDict(merge_file).close()

# do merge
metrics = MultiProcessCollector(None).merge(files + merge_files, accumulate=False)
# do merge, here we use the same method to merge
metrics = MultiProcessCollector.merge(files + merge_files, accumulate=False)
typ_metrics_dict = defaultdict(list)
for metric in metrics:
typ_metrics_dict[metric.type].append(metric)
Expand Down
99 changes: 99 additions & 0 deletions prometheus_client/process_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#!/usr/bin/env python
# coding=utf-8

import os

__all__ = ('LOCK_EX', 'LOCK_SH', 'LOCK_NB', 'lock', 'unlock')


def _fd(f):
return f.fileno() if hasattr(f, 'fileno') else f


if os.name == 'nt':
import msvcrt
from ctypes import (sizeof, c_ulong, c_void_p, c_int64,
Structure, Union, POINTER, windll, byref)
from ctypes.wintypes import BOOL, DWORD, HANDLE

LOCK_SH = 0 # the default
LOCK_NB = 0x1 # LOCKFILE_FAIL_IMMEDIATELY
LOCK_EX = 0x2 # LOCKFILE_EXCLUSIVE_LOCK

if sizeof(c_ulong) != sizeof(c_void_p):
ULONG_PTR = c_int64
else:
ULONG_PTR = c_ulong
PVOID = c_void_p


class _OFFSET(Structure):
_fields_ = [
('Offset', DWORD),
('OffsetHigh', DWORD)]


class _OFFSET_UNION(Union):
_anonymous_ = ['_offset']
_fields_ = [
('_offset', _OFFSET),
('Pointer', PVOID)]


class OVERLAPPED(Structure):
_anonymous_ = ['_offset_union']
_fields_ = [
('Internal', ULONG_PTR),
('InternalHigh', ULONG_PTR),
('_offset_union', _OFFSET_UNION),
('hEvent', HANDLE)]


LPOVERLAPPED = POINTER(OVERLAPPED)

LockFileEx = windll.kernel32.LockFileEx
LockFileEx.restype = BOOL
LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED]
UnlockFileEx = windll.kernel32.UnlockFileEx
UnlockFileEx.restype = BOOL
UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED]


def lock(f, flags):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
ret = LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped))
return bool(ret)


def unlock(f):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
ret = UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped))
return bool(ret)
else:
try:
import fcntl

LOCK_SH = fcntl.LOCK_SH # shared lock
LOCK_NB = fcntl.LOCK_NB # non-blocking
LOCK_EX = fcntl.LOCK_EX
except (ImportError, AttributeError):
LOCK_EX = LOCK_SH = LOCK_NB = 0


def lock(f, flags):
return False


def unlock(f):
return True
else:
def lock(f, flags):
ret = fcntl.flock(_fd(f), flags)
return ret == 0


def unlock(f):
ret = fcntl.flock(_fd(f), fcntl.LOCK_UN)
return ret == 0

0 comments on commit d99cc66

Please sign in to comment.