Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add socket_path option to enable unix socket traffic to dogstatsd6 #199

Merged
merged 5 commits into from
Jun 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
CHANGELOG
=========

# 0.17.0 / unreleased
* [FEATURE] DogStatsD: add socket_path option to enable unix socket traffic to dogstatsd6 [199][]

# 0.16.0 / 2017-04-26
* [FEATURE] Dogshell: Add filtering options to the `monitor show_all` command, [#194][]

Expand Down
42 changes: 27 additions & 15 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class DogStatsd(object):
OK, WARNING, CRITICAL, UNKNOWN = (0, 1, 2, 3)

def __init__(self, host='localhost', port=8125, max_buffer_size=50, namespace=None,
constant_tags=None, use_ms=False, use_default_route=False):
constant_tags=None, use_ms=False, use_default_route=False,
socket_path=None):
"""
Initialize a DogStatsd object.

Expand Down Expand Up @@ -56,10 +57,20 @@ def __init__(self, host='localhost', port=8125, max_buffer_size=50, namespace=No
(Useful when running the client in a container) (Linux only)
:type use_default_route: boolean

:param socket_path: Communicate with dogstatsd through a UNIX socket instead of
UDP. If set, disables UDP transmission (Linux only)
:type socket_path: string
"""

# Connection
self.host = self.resolve_host(host, use_default_route)
self.port = int(port)
if socket_path is not None:
self.socket_path = socket_path
self.host = None
self.port = None
else:
self.socket_path = None
self.host = self.resolve_host(host, use_default_route)
self.port = int(port)

# Socket
self.socket = None
Expand Down Expand Up @@ -105,9 +116,15 @@ def get_socket(self):
avoid bad thread race conditions.
"""
if not self.socket:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect((self.host, self.port))
self.socket = sock
if self.socket_path is not None:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.connect(self.socket_path)
sock.setblocking(0)
self.socket = sock
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect((self.host, self.port))
self.socket = sock

return self.socket

Expand Down Expand Up @@ -265,18 +282,13 @@ def _send_to_server(self, packet):
try:
# If set, use socket directly
(self.socket or self.get_socket()).send(packet.encode(self.encoding))
except socket.timeout:
# dogstatsd is overflowing, drop the packets (mimicks the UDP behaviour)
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? From what I understand, we should never get a timeout on non-blocking mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The python socket implementation implements non-blocking send as a zero-timeout send and sends a socket.timeout exception if the write does not return immediately (the queue, managed by the kernel, is full), I chose to silently drop the packet, mirroring what UDP would do. With dogstatsd6's goroutine-based intake, this should only happen if the CPU is saturated.

If needed, the socket queue length is configurable via sysctl net.unix.max_dgram_qlen (10 on my test machines).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to silently drop the packet, mirroring what UDP would do

@xvello I wonder if we should still add some logging here. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could turn into log spamming really fast. But I'll open another PR to add a fail_on_error option

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could turn into log spamming really fast

That's a good point!

except socket.error:
log.info("Error submitting packet, will try refreshing the socket")

log.info("Error submitting packet, dropping the packet and closing the socket")
self.close_socket()

try:
self.get_socket().send(packet.encode(self.encoding))
except socket.error:
self.close_socket()

log.exception("Failed to send packet with a newly bound socket")

def _send_to_buffer(self, packet):
self.buffer.append(packet)
if len(self.buffer) >= self.max_buffer_size:
Expand Down