Skip to content

Commit

Permalink
Log Optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
archfish committed Dec 17, 2019
1 parent 0ba0bdb commit fa8b311
Show file tree
Hide file tree
Showing 22 changed files with 81 additions and 23 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mv ${PB_OUT}PulsarMarkers_pb.rb ${PB_OUT}pulsar_markers.pb.rb
- [ ] Service discovery
- [x] Topic lookup
- [x] Partitioned topics discovery
- [ ] Log Optimization
- [x] Log Optimization
- [x] Connection pool
- [ ] Unit Test
- [x] Thread safe
Expand All @@ -72,7 +72,7 @@ mv ${PB_OUT}PulsarMarkers_pb.rb ${PB_OUT}pulsar_markers.pb.rb

## WIP

- Log Optimization
- Dead Letter Topic

[1]: https://github.com/apache/pulsar/wiki/PIP-26%3A-Delayed-Message-Delivery "PIP 26: Delayed Message Delivery"
[2]: https://pulsar.apache.org/docs/en/develop-binary-protocol/ "Pulsar binary protocol specification"
Expand Down
7 changes: 4 additions & 3 deletions examples/consumer_close.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@
th = Thread.new do
@consumer.flow
_cmd, msg = @consumer.receive
puts "msg: #{msg || 'stoped'}"
PulsarSdk.logger.info('msg') {msg || 'stoped'}
end

# =========stop listen============

th = Thread.new do
@consumer.listen do |cmd, msg|
puts "msg: #{msg}"
PulsarSdk.logger.info('msg') {msg}
end
puts "stoped"

PulsarSdk.logger.info('stoped')
end

@consumer.close
Expand Down
12 changes: 6 additions & 6 deletions examples/consumer_listen.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

#================Manual ack================#
consumer.listen do |cmd, msg|
puts "cmd => #{cmd}"
puts "msg => #{msg}"
PulsarSdk.logger.info('cmd') {cmd}
PulsarSdk.logger.info('msg') {msg}
msg.ack
end

Expand All @@ -23,14 +23,14 @@

# ack after process
consumer.listen(true) do |cmd, msg|
puts "cmd => #{cmd}"
puts "msg => #{msg}"
PulsarSdk.logger.info('cmd') {cmd}
PulsarSdk.logger.info('msg') {msg}
end

# nack after process
consumer.listen(true) do |cmd, msg|
puts "cmd => #{cmd}"
puts "msg => #{msg}"
PulsarSdk.logger.info('cmd') {cmd}
PulsarSdk.logger.info('msg') {msg}
false
end

Expand Down
25 changes: 23 additions & 2 deletions lib/pulsar_sdk.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'logger'
require 'protobuf/pulsar_api.pb'
require "pulsar_sdk/version"
require 'pulsar_sdk/tweaks'
Expand All @@ -8,6 +9,26 @@
require 'pulsar_sdk/client'

module PulsarSdk
class Error < StandardError; end
# Your code goes here...
extend self

def logger
@logger ||= Logger.new(STDOUT).tap do |logger|
logger.formatter = Formatter.new
end
end

def logger=(v)
@logger = v
end

class Formatter < ::Logger::Formatter
def call(severity, timestamp, progname, msg)
case msg
when ::StandardError
msg = [msg.message, msg&.backtrace].join(":\n")
end

super
end
end
end
11 changes: 6 additions & 5 deletions lib/pulsar_sdk/client/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
module PulsarSdk
module Client
class Connection
prepend ::PulsarSdk::Tweaks::CleanInspect

CLIENT_NAME = "pulsar-client-#{PulsarSdk::VERSION}".freeze
PROTOCOL_VER = Pulsar::Proto::ProtocolVersion::V13

Expand Down Expand Up @@ -106,8 +108,7 @@ def run
rescue Errno::ETIMEDOUT
# read timeout, do nothing
rescue => e
puts "ERROR: reader exist, cause by #{e}"
puts "BACKTRACE: #{e.backtrace.join("\n")}"
PulsarSdk.logger.fatal("reader exist!!") {e}
@state.closed!
end
end
Expand Down Expand Up @@ -172,12 +173,12 @@ def write(bytes)
end

def handle_base_command(cmd, payload)
puts "INFO: handle_base_command: #{cmd.type}"
PulsarSdk.logger.debug(__method__){cmd.type}
case
when cmd.typeof_success?
handle_response(cmd)
when cmd.typeof_connected?
puts "#{cmd.type}: #{cmd.connected}"
PulsarSdk.logger.info(__method__){"#{cmd.type}: #{cmd.connected}"}
when cmd.typeof_producer_success?
handle_response(cmd)
when cmd.typeof_lookup_response?
Expand All @@ -190,7 +191,7 @@ def handle_base_command(cmd, payload)
when cmd.typeof_partitioned_metadata_response?
handle_response(cmd)
when cmd.typeof_error?
puts "ERROR: #{cmd.error} \n #{cmd.message}"
PulsarSdk.logger.error(__method__){"#{cmd.error}: #{cmd.message}"}
when cmd.typeof_close_producer?
when cmd.typeof_close_consumer?
when cmd.typeof_message?
Expand Down
2 changes: 2 additions & 0 deletions lib/pulsar_sdk/client/connection_pool.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module PulsarSdk
module Client
class ConnectionPool
prepend ::PulsarSdk::Tweaks::CleanInspect

def initialize(opts)
raise "opts expected a PulsarSdk::Options::Connection got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Connection)

Expand Down
2 changes: 2 additions & 0 deletions lib/pulsar_sdk/client/rpc.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module PulsarSdk
module Client
class Rpc
prepend ::PulsarSdk::Tweaks::CleanInspect

def initialize(opts)
raise "opts expected a PulsarSdk::Options::Connection got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Connection)

Expand Down
2 changes: 1 addition & 1 deletion lib/pulsar_sdk/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def listen(autoack = false)
msg.ack if autoack

if !msg.confirmed?
puts "WARN: message was not confiremed! message_id: #{msg.message_id}"
PulsarSdk.logger.warn('message was not confiremed') {msg.message_id}
end
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/pulsar_sdk/options/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module PulsarSdk
module Options
class Base
prepend ::PulsarSdk::Tweaks::AssignAttributes
prepend ::PulsarSdk::Tweaks::CleanInspect
end
end
end
2 changes: 2 additions & 0 deletions lib/pulsar_sdk/producer/manager.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module PulsarSdk
module Producer
class Manager
prepend ::PulsarSdk::Tweaks::CleanInspect

def initialize(client, opts)
@producers = init_producer_by(client, opts)
@router = opts.router
Expand Down
3 changes: 3 additions & 0 deletions lib/pulsar_sdk/producer/message.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
module PulsarSdk
module Producer
class Message
prepend ::PulsarSdk::Tweaks::CleanInspect

attr_reader :metadata, :message, :key

def initialize(msg, metadata = nil)
# TODO check metadata type
@message, @metadata = msg, metadata
Expand Down
2 changes: 2 additions & 0 deletions lib/pulsar_sdk/producer/partition.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module PulsarSdk
module Producer
class Partition
prepend ::PulsarSdk::Tweaks::CleanInspect

def initialize(client, opts)
@topic = opts.topic

Expand Down
2 changes: 2 additions & 0 deletions lib/pulsar_sdk/producer/router.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module PulsarSdk
module Producer
class Router
prepend ::PulsarSdk::Tweaks::CleanInspect

def initialize(scheme = :string_hash)
case scheme.to_sym
when :string_hash
Expand Down
2 changes: 1 addition & 1 deletion lib/pulsar_sdk/protocol/lookup.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def lookup(topic)
MAX_LOOKUP_TIMES.times do
case Pulsar::Proto::CommandLookupTopicResponse::LookupType.resolve(resp.response)
when Pulsar::Proto::CommandLookupTopicResponse::LookupType::Failed
puts "ERROR: Failed to lookup topic 「#{topic}」, #{resp.error}"
PulsarSdk.logger.error(__method__){"Failed to lookup topic 「#{topic}」, #{resp.error}"}
break
when Pulsar::Proto::CommandLookupTopicResponse::LookupType::Redirect
logical_addr, physical_addr = extract_addr(resp)
Expand Down
1 change: 1 addition & 0 deletions lib/pulsar_sdk/protocol/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module PulsarSdk
module Protocol
class Message
prepend ::PulsarSdk::Tweaks::AssignAttributes
prepend ::PulsarSdk::Tweaks::CleanInspect

attr_accessor :publish_time, :event_time, :partition_key, :payload,
:message_id, :properties, :consumer_id, :topic
Expand Down
2 changes: 1 addition & 1 deletion lib/pulsar_sdk/protocol/partitioned_topic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def partitions
pmr = topic_metadata

if !success_response?(pmr)
puts "ERROR: get topic partitioned metadata failed, #{pmr.error}: #{pmr.message}"
PulsarSdk.logger.error(__method__){"Get topic partitioned metadata failed, #{pmr.error}: #{pmr.message}"}
return []
end

Expand Down
2 changes: 2 additions & 0 deletions lib/pulsar_sdk/protocol/reader.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module PulsarSdk
module Protocol
class Reader
prepend ::PulsarSdk::Tweaks::CleanInspect

FRAME_SIZE_LEN = 4
CMD_SIZE_LEN = 4

Expand Down
2 changes: 2 additions & 0 deletions lib/pulsar_sdk/protocol/structure.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module PulsarSdk
module Protocol
class Structure
prepend ::PulsarSdk::Tweaks::CleanInspect

# [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD]
MAGIC_NUMBER = [0x0e, 0x01].pack('C*').freeze
MAGIC_NUMBER_LEN = MAGIC_NUMBER.size
Expand Down
1 change: 1 addition & 0 deletions lib/pulsar_sdk/tweaks.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'pulsar_sdk/tweaks/assign_attributes'
require 'pulsar_sdk/tweaks/clean_inspect'
require 'pulsar_sdk/tweaks/base_command'
require 'pulsar_sdk/tweaks/time_at_microsecond'
require 'pulsar_sdk/tweaks/timeout_queue'
Expand Down
2 changes: 1 addition & 1 deletion lib/pulsar_sdk/tweaks/base_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module BaseCommand
[:request_id, :consumer_id, :producer_id, :sequence_id].each do |x|
define_method "set_#{x}" do
if self.seq_generator.nil?
puts "ERROR: seq_generator was not set!! #{__method__} has no effect"
PulsarSdk.logger.warn(__method__){"seq_generator was not set!! action has no effect"}
return
end

Expand Down
15 changes: 15 additions & 0 deletions lib/pulsar_sdk/tweaks/clean_inspect.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module PulsarSdk
module Tweaks
module CleanInspect
def inspect
s = instance_variables.map do |x|
v = instance_variable_get(x)
next if v.is_a?(Proc)
"#{x}: #{v}"
end.join(', ')

"<#{self.class.name} #{s}>"
end
end
end
end
2 changes: 1 addition & 1 deletion lib/pulsar_sdk/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module PulsarSdk
VERSION = "0.6.7"
VERSION = "0.7.0"
end

0 comments on commit fa8b311

Please sign in to comment.