diff --git a/input_kafka.go b/input_kafka.go index a4c2fa22..593539f8 100644 --- a/input_kafka.go +++ b/input_kafka.go @@ -3,7 +3,9 @@ package goreplay import ( "encoding/json" "log" + "strconv" "strings" + "time" "github.com/Shopify/sarama" "github.com/Shopify/sarama/mocks" @@ -12,14 +14,24 @@ import ( // KafkaInput is used for receiving Kafka messages and // transforming them into HTTP payloads. type KafkaInput struct { - config *InputKafkaConfig - consumers []sarama.PartitionConsumer - messages chan *sarama.ConsumerMessage - quit chan struct{} + config *InputKafkaConfig + consumers []sarama.PartitionConsumer + messages chan *sarama.ConsumerMessage + speedFactor float64 + quit chan struct{} + kafkaTimer *kafkaTimer +} + +func getOffsetOfPartitions(offsetCfg string) int64 { + offset, err := strconv.ParseInt(offsetCfg, 10, 64) + if err != nil || offset < -2 { + log.Fatalln("Failed to parse offset: "+offsetCfg, err) + } + return offset } // NewKafkaInput creates instance of kafka consumer client with TLS config -func NewKafkaInput(_ string, config *InputKafkaConfig, tlsConfig *KafkaTLSConfig) *KafkaInput { +func NewKafkaInput(offsetCfg string, config *InputKafkaConfig, tlsConfig *KafkaTLSConfig) *KafkaInput { c := NewKafkaConfig(&config.SASLConfig, tlsConfig) var con sarama.Consumer @@ -41,14 +53,17 @@ func NewKafkaInput(_ string, config *InputKafkaConfig, tlsConfig *KafkaTLSConfig } i := &KafkaInput{ - config: config, - consumers: make([]sarama.PartitionConsumer, len(partitions)), - messages: make(chan *sarama.ConsumerMessage, 256), - quit: make(chan struct{}), + config: config, + consumers: make([]sarama.PartitionConsumer, len(partitions)), + messages: make(chan *sarama.ConsumerMessage, 256), + speedFactor: 1, + quit: make(chan struct{}), + kafkaTimer: new(kafkaTimer), } + i.config.Offset = offsetCfg for index, partition := range partitions { - consumer, err := con.ConsumePartition(config.Topic, partition, sarama.OffsetNewest) + consumer, err := con.ConsumePartition(config.Topic, partition, getOffsetOfPartitions(offsetCfg)) if err != nil { log.Fatalln("Failed to start Sarama(Kafka) partition consumer:", err) } @@ -86,12 +101,15 @@ func (i *KafkaInput) PluginRead() (*Message, error) { case message = <-i.messages: } + inputTs := "" + msg.Data = message.Value if i.config.UseJSON { var kafkaMessage KafkaMessage json.Unmarshal(message.Value, &kafkaMessage) + inputTs = kafkaMessage.ReqTs var err error msg.Data, err = kafkaMessage.Dump() if err != nil { @@ -103,8 +121,11 @@ func (i *KafkaInput) PluginRead() (*Message, error) { // does it have meta if isOriginPayload(msg.Data) { msg.Meta, msg.Data = payloadMetaWithBody(msg.Data) + inputTs = string(payloadMeta(msg.Meta)[2]) } + i.timeWait(inputTs) + return &msg, nil } @@ -118,3 +139,45 @@ func (i *KafkaInput) Close() error { close(i.quit) return nil } + +func (i *KafkaInput) timeWait(curInputTs string) { + if i.config.Offset == "-1" || curInputTs == "" { + return + } + + // implement for Kafka input showdown or speedup emitting + timer := i.kafkaTimer + curTs := time.Now().UnixNano() + + curInput, err := strconv.ParseInt(curInputTs, 10, 64) + + if timer.latestInputTs == 0 || timer.latestOutputTs == 0 { + timer.latestInputTs = curInput + timer.latestOutputTs = curTs + return + } + + if err != nil { + log.Fatalln("Fatal to parse timestamp err: ", err) + } + + diffTs := curInput - timer.latestInputTs + pastTs := curTs - timer.latestOutputTs + + diff := diffTs - pastTs + if i.speedFactor != 1 { + diff = int64(float64(diff) / i.speedFactor) + } + + if diff > 0 { + time.Sleep(time.Duration(diff)) + } + + timer.latestInputTs = curInput + timer.latestOutputTs = curTs +} + +type kafkaTimer struct { + latestInputTs int64 + latestOutputTs int64 +} diff --git a/input_kafka_test.go b/input_kafka_test.go index d632cf82..80ba5b4b 100644 --- a/input_kafka_test.go +++ b/input_kafka_test.go @@ -16,7 +16,7 @@ func TestInputKafkaRAW(t *testing.T) { map[string][]int32{"test": {0}}, ) - input := NewKafkaInput("", &InputKafkaConfig{ + input := NewKafkaInput("-1", &InputKafkaConfig{ consumer: consumer, Topic: "test", UseJSON: false, @@ -42,7 +42,7 @@ func TestInputKafkaJSON(t *testing.T) { map[string][]int32{"test": {0}}, ) - input := NewKafkaInput("", &InputKafkaConfig{ + input := NewKafkaInput("-1", &InputKafkaConfig{ consumer: consumer, Topic: "test", UseJSON: true, diff --git a/kafka.go b/kafka.go index f1497701..ca0e83ba 100644 --- a/kafka.go +++ b/kafka.go @@ -31,6 +31,7 @@ type InputKafkaConfig struct { Host string `json:"input-kafka-host"` Topic string `json:"input-kafka-topic"` UseJSON bool `json:"input-kafka-json-format"` + Offset string `json:"input-kafka-offset"` SASLConfig SASLKafkaConfig } diff --git a/limiter.go b/limiter.go index 7ee48547..4eff7b07 100644 --- a/limiter.go +++ b/limiter.go @@ -31,6 +31,22 @@ func parseLimitOptions(options string) (limit int, isPercent bool) { return } +func newLimiterExceptions(l *Limiter) { + + if !l.isPercent { + return + } + speedFactor := float64(l.limit) / float64(100) + + // FileInput、KafkaInput have its own rate limiting. Unlike other inputs we not just dropping requests, we can slow down or speed up request emittion. + switch input := l.plugin.(type) { + case *FileInput: + input.speedFactor = speedFactor + case *KafkaInput: + input.speedFactor = speedFactor + } +} + // NewLimiter constructor for Limiter, accepts plugin and options // `options` allow to sprcify relatve or absolute limiting func NewLimiter(plugin interface{}, options string) PluginReadWriter { @@ -39,17 +55,28 @@ func NewLimiter(plugin interface{}, options string) PluginReadWriter { l.plugin = plugin l.currentTime = time.Now().UnixNano() - // FileInput have its own rate limiting. Unlike other inputs we not just dropping requests, we can slow down or speed up request emittion. - if fi, ok := l.plugin.(*FileInput); ok && l.isPercent { - fi.speedFactor = float64(l.limit) / float64(100) - } + newLimiterExceptions(l) return l } +func (l *Limiter) isLimitedExceptions() bool { + if !l.isPercent { + return false + } + // Fileinput、Kafkainput have its own limiting algorithm + switch l.plugin.(type) { + case *FileInput: + return true + case *KafkaInput: + return true + default: + return false + } +} + func (l *Limiter) isLimited() bool { - // File input have its own limiting algorithm - if _, ok := l.plugin.(*FileInput); ok && l.isPercent { + if l.isLimitedExceptions() { return false } diff --git a/plugins.go b/plugins.go index d37b6832..c83a99ce 100644 --- a/plugins.go +++ b/plugins.go @@ -159,7 +159,7 @@ func NewPlugins() *InOutPlugins { } if Settings.InputKafkaConfig.Host != "" && Settings.InputKafkaConfig.Topic != "" { - plugins.registerPlugin(NewKafkaInput, "", &Settings.InputKafkaConfig, &Settings.KafkaTLSConfig) + plugins.registerPlugin(NewKafkaInput, Settings.InputKafkaConfig.Offset, &Settings.InputKafkaConfig, &Settings.KafkaTLSConfig) } return plugins diff --git a/settings.go b/settings.go index 884803e6..47951e34 100644 --- a/settings.go +++ b/settings.go @@ -250,6 +250,7 @@ func init() { flag.StringVar(&Settings.InputKafkaConfig.SASLConfig.Mechanism, "input-kafka-mechanism", "", "mechanism\n\tgor --input-raw :8080 --output-kafka-mechanism 'SCRAM-SHA-512'") flag.StringVar(&Settings.InputKafkaConfig.SASLConfig.Username, "input-kafka-username", "", "username\n\tgor --input-raw :8080 --output-kafka-username 'username'") flag.StringVar(&Settings.InputKafkaConfig.SASLConfig.Password, "input-kafka-password", "", "password\n\tgor --input-raw :8080 --output-kafka-password 'password'") + flag.StringVar(&Settings.InputKafkaConfig.Offset, "input-kafka-offset", "-1", "Specify offset in Kafka partitions start to consume\n\t-1: Starts from newest, -2: Starts from oldest\nAnd supported for showdown or speedup for emitting!\n\tgor --input-kafka-offset \"-2|200%\"") flag.StringVar(&Settings.KafkaTLSConfig.CACert, "kafka-tls-ca-cert", "", "CA certificate for Kafka TLS Config:\n\tgor --input-raw :3000 --output-kafka-host '192.168.0.1:9092' --output-kafka-topic 'topic' --kafka-tls-ca-cert cacert.cer.pem --kafka-tls-client-cert client.cer.pem --kafka-tls-client-key client.key.pem") flag.StringVar(&Settings.KafkaTLSConfig.ClientCert, "kafka-tls-client-cert", "", "Client certificate for Kafka TLS Config (mandatory with to kafka-tls-ca-cert and kafka-tls-client-key)")