Skip to content
This repository has been archived by the owner on Jan 30, 2021. It is now read-only.

Installation failed with logstash 6.2+ #14 #15

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Please note that the name of the plugin when used is `clickhouse`, it only suppo
* `automatic_retries` (default: 1) - number of connect retry attempts to each host in `http_hosts`
* `request_tolerance` (default: 5) - number of http request send retry attempts if response status code is not 200
* `backoff_time` (default: 3) - time to wait in seconds for next retry attempt of connect or request
* `skip_unknown` (0 or 1, default: 1) - skip unknown fields when inserting into clickhouse. Uses `--input_format_skip_unknown_fields` parameter

Default batch size is 50, with a wait of at most 5 seconds per send. These can be tweaked with the parameters `flush_size` and `idle_flush_time` respectively.

Expand All @@ -39,3 +40,4 @@ To build the gem yourself, use `gem build logstash-output-clickhouse.gemspec` in

To install, run the following command, assuming the gem is in the local directory: `$LOGSTASH_HOME/bin/plugin install logstash-output-clickhouse-X.Y.Z.gem`

P.S. Tested on Logstash 7.1.1
26 changes: 4 additions & 22 deletions lib/logstash/outputs/clickhouse.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class LogStash::Outputs::ClickHouse < LogStash::Outputs::Base

config :table, :validate => :string, :required => true

config :skip_unknown, :validate => :number, :default => 1, :inclusion => 0..1

# Custom headers to use
# format is `headers => ["X-My-Header", "%{host}"]`
config :headers, :validate => :hash
Expand Down Expand Up @@ -73,7 +75,7 @@ def register
@request_tokens = SizedQueue.new(@pool_max)
@pool_max.times {|t| @request_tokens << true }
@requests = Array.new
@http_query = "/?query=INSERT%20INTO%20#{table}%20FORMAT%20JSONEachRow"
@http_query = "/?input_format_skip_unknown_fields=#{skip_unknown}&query=INSERT%20INTO%20#{table}%20FORMAT%20JSONEachRow"

@hostnames_pool =
parse_http_hosts(http_hosts,
Expand Down Expand Up @@ -126,32 +128,12 @@ def receive(event)
buffer_receive(event)
end

def mutate( src )
res = {}
@mutations.each_pair do |dstkey, source|
case source
when String then
scrkey = source
next unless src.key?(scrkey)

res[dstkey] = src[scrkey]
when Array then
scrkey = source[0]
next unless src.key?(scrkey)
pattern = source[1]
replace = source[2]
res[dstkey] = src[scrkey].sub( Regexp.new(pattern), replace )
end
end
res
end

public
def flush(events, close=false)
documents = "" #this is the string of hashes that we push to Fusion as documents

events.each do |event|
documents << LogStash::Json.dump( mutate( event.to_hash() ) ) << "\n"
documents << LogStash::Json.dump( event ) << "\n"
end

hosts = get_host_addresses()
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-clickhouse.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency "logstash-mixin-http_client", ">= 6.0.0", "<= 7.0.0"
s.add_runtime_dependency "logstash-mixin-http_client", ">= 6.0.0", "< 9.0.0"
s.add_runtime_dependency 'mini_cache', ">= 1.0.0", "< 2.0.0"

s.add_development_dependency 'logstash-devutils'
Expand Down