Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Downgrade the protocol of connection when protocol break #181

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Release Notes.
* Reduce unessential `conntrack` query when detect new connection.
* Reduce CPU and memory usage in the access log module.
* Reduce handle connection event time in the access log module.
* Downgrade the protocol of connection when protocol break in the access log module.

#### Bug Fixes
* Fix the base image cannot run in the arm64.
Expand Down
12 changes: 11 additions & 1 deletion bpf/include/protocol_analyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ static __inline __u32 infer_http2_message(const char* buf, size_t count) {
bpf_probe_read(frame, sizeof(frame), buf + frameOffset);
frameOffset += (bpf_ntohl(*(__u32 *) frame) >> 8) + kFrameBasicSize;

// frametype only accept 0x00 - 0x09
if (frame[3] > 0x09) {
return CONNECTION_MESSAGE_TYPE_UNKNOWN;
}
// is header frame
if (frame[3] != kFrameTypeHeader) {
continue;
Expand All @@ -135,9 +139,15 @@ static __inline __u32 infer_http2_message(const char* buf, size_t count) {
return CONNECTION_MESSAGE_TYPE_UNKNOWN;
}

// stream ID cannot be 0
__u32 streamID = ((frame[5] & 0x7F) << 24) | (frame[6] << 16) | (frame[7] << 8) | frame[8];
if (streamID == 0) {
return CONNECTION_MESSAGE_TYPE_UNKNOWN;
}

// locate the header block fragment offset
headerBlockFragmentOffset = kFrameBasicSize;
if (frame[4] & 0x20) { // PADDED flag is set
if (frame[4] & 0x08) { // PADDED flag is set
headerBlockFragmentOffset += 1;
}
if (frame[4] & 0x20) { // PRIORITY flag is set
Expand Down
26 changes: 16 additions & 10 deletions pkg/accesslog/collector/protocols/http1.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,24 @@ import (
var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1")
var http1AnalyzeMaxRetryCount = 3

type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, connection *PartitionConnection,
request *reader.Request, response *reader.Response) error
type HTTP1ProtocolAnalyzer interface {
HandleHTTPData(metrics *HTTP1Metrics, connection *PartitionConnection,
request *reader.Request, response *reader.Response) error
OnProtocolBreak(metrics *HTTP1Metrics, connection *PartitionConnection)
}

type HTTP1Protocol struct {
ctx *common.AccessLogContext
analyze HTTP1ProtocolAnalyze
reader *reader.Reader
ctx *common.AccessLogContext
analyzer HTTP1ProtocolAnalyzer
reader *reader.Reader
}

func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze HTTP1ProtocolAnalyze) *HTTP1Protocol {
func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze HTTP1ProtocolAnalyzer) *HTTP1Protocol {
protocol := &HTTP1Protocol{ctx: ctx, reader: reader.NewReader()}
if analyze == nil {
protocol.analyze = protocol.HandleHTTPData
protocol.analyzer = protocol
} else {
protocol.analyze = analyze
protocol.analyzer = analyze
}
return protocol
}
Expand Down Expand Up @@ -174,7 +177,7 @@ func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, connection *Partit
}

// getting the request and response, then send to the forwarder
if analyzeError := p.analyze(metrics, connection, request, response); analyzeError != nil {
if analyzeError := p.analyzer.HandleHTTPData(metrics, connection, request, response); analyzeError != nil {
p.appendAnalyzeUnFinished(metrics, request, response)
}
return enums.ParseResultSuccess, nil
Expand All @@ -191,7 +194,7 @@ func (p *HTTP1Protocol) appendAnalyzeUnFinished(metrics *HTTP1Metrics, request *
func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics, connection *PartitionConnection) {
for element := m.analyzeUnFinished.Front(); element != nil; {
unFinished := element.Value.(*HTTP1AnalyzeUnFinished)
err := p.analyze(m, connection, unFinished.request, unFinished.response)
err := p.analyzer.HandleHTTPData(m, connection, unFinished.request, unFinished.response)
if err != nil {
unFinished.retryCount++
if unFinished.retryCount < http1AnalyzeMaxRetryCount {
Expand Down Expand Up @@ -266,6 +269,9 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, connection *Partit
return nil
}

func (p *HTTP1Protocol) OnProtocolBreak(metrics *HTTP1Metrics, connection *PartitionConnection) {
}

func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) {
if ioReader != nil {
_ = ioReader.Close()
Expand Down
74 changes: 41 additions & 33 deletions pkg/accesslog/collector/protocols/http2.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,32 @@ var maxHTTP2StreamingTime = time.Minute * 3

var http2Log = logger.GetLogger("accesslog", "collector", "protocols", "http2")

type HTTP2StreamAnalyze func(stream *HTTP2Streaming) error
type HTTP2StreamAnalyzer interface {
HandleWholeStream(connection *PartitionConnection, stream *HTTP2Streaming) error
OnProtocolBreak(connection *PartitionConnection, metrics *HTTP2Metrics)
}

type HTTP2Protocol struct {
ctx *common.AccessLogContext
analyze HTTP2StreamAnalyze
ctx *common.AccessLogContext
analyzer HTTP2StreamAnalyzer
}

func NewHTTP2Analyzer(ctx *common.AccessLogContext, analyze HTTP2StreamAnalyze) *HTTP2Protocol {
func NewHTTP2Analyzer(ctx *common.AccessLogContext, analyzer HTTP2StreamAnalyzer) *HTTP2Protocol {
protocol := &HTTP2Protocol{ctx: ctx}
if analyze == nil {
protocol.analyze = protocol.handleWholeStream
if analyzer == nil {
protocol.analyzer = protocol
} else {
protocol.analyze = analyze
protocol.analyzer = analyzer
}
return protocol
}

type HTTP2Metrics struct {
connectionID uint64
randomID uint64
hpackDecoder *hpack.Decoder
ConnectionID uint64
RandomID uint64
HpackDecoder *hpack.Decoder

streams map[uint32]*HTTP2Streaming
Streams map[uint32]*HTTP2Streaming
}

type HTTP2Streaming struct {
Expand All @@ -82,18 +85,18 @@ type HTTP2Streaming struct {

func (r *HTTP2Protocol) GenerateConnection(connectionID, randomID uint64) ProtocolMetrics {
return &HTTP2Metrics{
connectionID: connectionID,
randomID: randomID,
hpackDecoder: hpack.NewDecoder(4096, nil),
streams: make(map[uint32]*HTTP2Streaming),
ConnectionID: connectionID,
RandomID: randomID,
HpackDecoder: hpack.NewDecoder(4096, nil),
Streams: make(map[uint32]*HTTP2Streaming),
}
}

func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error {
http2Metrics := connection.Metrics(enums.ConnectionProtocolHTTP2).(*HTTP2Metrics)
buf := connection.Buffer(enums.ConnectionProtocolHTTP2)
http2Log.Debugf("ready to analyze HTTP/2 protocol data, connection ID: %d, random ID: %d",
http2Metrics.connectionID, http2Metrics.randomID)
http2Metrics.ConnectionID, http2Metrics.RandomID)
buf.ResetForLoopReading()
for {
if !buf.PrepareForReading() {
Expand All @@ -115,9 +118,9 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze
var result enums.ParseResult
switch header.Type {
case http2.FrameHeaders:
result, protocolBreak, _ = r.handleHeader(connection, &header, startPosition, http2Metrics, buf)
result, protocolBreak, _ = r.HandleHeader(connection, &header, startPosition, http2Metrics, buf)
case http2.FrameData:
result, protocolBreak, _ = r.handleData(&header, startPosition, http2Metrics, buf)
result, protocolBreak, _ = r.HandleData(connection, &header, startPosition, http2Metrics, buf)
default:
tmp := make([]byte, header.Length)
if err := buf.ReadUntilBufferFull(tmp); err != nil {
Expand All @@ -134,8 +137,9 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze
// if the protocol break, then stop the loop and notify the caller to skip analyze all data(just sending the detail)
if protocolBreak {
http2Log.Warnf("the HTTP/2 protocol break, maybe not tracing the connection from beginning, skip all data analyze in this connection, "+
"connection ID: %d", http2Metrics.connectionID)
"connection ID: %d", http2Metrics.ConnectionID)
helper.ProtocolBreak = true
r.analyzer.OnProtocolBreak(connection, http2Metrics)
break
}

Expand All @@ -159,19 +163,19 @@ func (r *HTTP2Protocol) ForProtocol() enums.ConnectionProtocol {
return enums.ConnectionProtocolHTTP2
}

func (r *HTTP2Protocol) handleHeader(connection *PartitionConnection, header *http2.FrameHeader, startPos *buffer.Position,
func (r *HTTP2Protocol) HandleHeader(connection *PartitionConnection, header *http2.FrameHeader, startPos *buffer.Position,
metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool, error) {
bytes := make([]byte, header.Length)
if err := buf.ReadUntilBufferFull(bytes); err != nil {
return enums.ParseResultSkipPackage, false, err
return enums.ParseResultSkipPackage, true, err
}
headerData, err := metrics.hpackDecoder.DecodeFull(bytes)
headerData, err := metrics.HpackDecoder.DecodeFull(bytes)
if err != nil {
// reading the header failure, maybe not tracing the connection from beginning
return enums.ParseResultSkipPackage, true, err
}
// saving stream
streaming := metrics.streams[header.StreamID]
streaming := metrics.Streams[header.StreamID]
headers := r.parseHeaders(headerData)
if streaming == nil {
streaming = &HTTP2Streaming{
Expand All @@ -180,7 +184,7 @@ func (r *HTTP2Protocol) handleHeader(connection *PartitionConnection, header *ht
ReqHeaderBuffer: buf.Slice(true, startPos, buf.Position()),
Connection: connection,
}
metrics.streams[header.StreamID] = streaming
metrics.Streams[header.StreamID] = streaming
return enums.ParseResultSuccess, false, nil
}

Expand All @@ -207,14 +211,15 @@ func (r *HTTP2Protocol) handleHeader(connection *PartitionConnection, header *ht
// is end of stream and in the response
if header.Flags.Has(http2.FlagHeadersEndStream) {
// should be end of the stream and send to the protocol
_ = r.analyze(streaming)
_ = r.analyzer.HandleWholeStream(connection, streaming)
// delete streaming
delete(metrics.streams, header.StreamID)
delete(metrics.Streams, header.StreamID)
}
return enums.ParseResultSuccess, false, nil
}

func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id uint32, streaming *HTTP2Streaming) {
func (r *HTTP2Protocol) validateIsStreamOpenTooLong(connection *PartitionConnection,
metrics *HTTP2Metrics, id uint32, streaming *HTTP2Streaming) {
// if in the response mode or the request body is not nil, then skip
if streaming.IsInResponse || streaming.ReqBodyBuffer == nil {
return
Expand All @@ -227,9 +232,9 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id ui
}
if time.Since(host.Time(socketBuffer.StartTime())) > maxHTTP2StreamingTime {
http2Log.Infof("detect the HTTP/2 stream is too long, split the stream, connection ID: %d, stream ID: %d, headers: %v",
metrics.connectionID, id, streaming.ReqHeader)
metrics.ConnectionID, id, streaming.ReqHeader)

_ = r.analyze(streaming)
_ = r.analyzer.HandleWholeStream(connection, streaming)

// clean sent buffers
if streaming.ReqBodyBuffer != nil {
Expand All @@ -238,7 +243,7 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id ui
}
}

func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) error {
func (r *HTTP2Protocol) HandleWholeStream(_ *PartitionConnection, stream *HTTP2Streaming) error {
details := make([]events.SocketDetail, 0)
var allInclude = true
var idRange *buffer.DataIDRange
Expand Down Expand Up @@ -285,6 +290,9 @@ func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) error {
return nil
}

func (r *HTTP2Protocol) OnProtocolBreak(connection *PartitionConnection, metrics *HTTP2Metrics) {
}

func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming) v3.AccessLogHTTPProtocolRequestMethod {
method := streaming.ReqHeader[":method"]
if method == "" {
Expand Down Expand Up @@ -318,10 +326,10 @@ func (r *HTTP2Protocol) AppendHeaders(exist, needAppends map[string]string) {
}
}

func (r *HTTP2Protocol) handleData(header *http2.FrameHeader, startPos *buffer.Position,
func (r *HTTP2Protocol) HandleData(connection *PartitionConnection, header *http2.FrameHeader, startPos *buffer.Position,
metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool, error) {
bytes := make([]byte, header.Length)
streaming := metrics.streams[header.StreamID]
streaming := metrics.Streams[header.StreamID]
if streaming == nil {
// cannot found the stream, maybe not tracing the connection from beginning
return enums.ParseResultSkipPackage, true, nil
Expand All @@ -335,7 +343,7 @@ func (r *HTTP2Protocol) handleData(header *http2.FrameHeader, startPos *buffer.P
streaming.RespBodyBuffer = buffer.CombineSlices(true, buf, streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position()))
}

r.validateIsStreamOpenTooLong(metrics, header.StreamID, streaming)
r.validateIsStreamOpenTooLong(connection, metrics, header.StreamID, streaming)
return enums.ParseResultSuccess, false, nil
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/accesslog/collector/protocols/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,11 @@ func (p *PartitionContext) processConnectionEvents(connection *PartitionConnecti
if helper.ProtocolBreak {
// notify the connection manager to skip analyze all data(just sending the detail)
connection.skipAllDataAnalyze = true
p.context.ConnectionMgr.SkipAllDataAnalyze(connection.connectionID, connection.randomID)
p.context.ConnectionMgr.SkipAllDataAnalyzeAndDowngradeProtocol(connection.connectionID, connection.randomID)
for _, buf := range connection.dataBuffers {
for e := buf.BuildDetails().Front(); e != nil; e = e.Next() {
forwarder.SendTransferNoProtocolEvent(p.context, e.Value.(events.SocketDetail))
}
buf.Clean()
}
}
Expand Down
Loading
Loading