Skip to content

Commit

Permalink
Feature: Modify Default Producer Compression Type
Browse files Browse the repository at this point in the history
  • Loading branch information
real-mday committed Nov 12, 2024
1 parent 0efe580 commit 5f1e16a
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 2 deletions.
33 changes: 33 additions & 0 deletions backend/pkg/api/handle_producer_compression_type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package api

import (
"net/http"

"github.com/cloudhut/common/rest"
"go.uber.org/zap"
)

// handleGetProducerCompressionType returns the default producer compression type
func (api *API) handleGetProducerCompressionType() http.HandlerFunc {
type response struct {
ProducerCompressionType string `json:"producerCompressionType"`
}

return func(w http.ResponseWriter, r *http.Request) {
producerCompressionType := api.Cfg.Kafka.PComp.Value
logger := api.Logger.With(zap.String("producer_compression_type", producerCompressionType))

rest.SendResponse(w, r, logger, http.StatusOK, &response{
ProducerCompressionType: producerCompressionType,
})
}
}
3 changes: 3 additions & 0 deletions backend/pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,9 @@ func (api *API) routes() *chi.Mux {
r.Patch("/operations/reassign-partitions", api.handlePatchPartitionAssignments())
r.Patch("/operations/configs", api.handlePatchConfigs())

// Producer Compression Type Default
r.Get("/producer-compression-type", api.handleGetProducerCompressionType())

// Schema Registry
r.Get("/schema-registry/mode", api.handleGetSchemaRegistryMode())
r.Get("/schema-registry/config", api.handleGetSchemaRegistryConfig())
Expand Down
1 change: 1 addition & 0 deletions backend/pkg/config/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Kafka struct {

// Schema Registry
Schema Schema `yaml:"schemaRegistry"`
PComp PComp `yaml:"producerCompressionType"`
Protobuf Proto `yaml:"protobuf"`
MessagePack Msgpack `yaml:"messagePack"`
Cbor Cbor `yaml:"cbor"`
Expand Down
20 changes: 20 additions & 0 deletions backend/pkg/config/pcomp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package config

// PComp represents the producer compression type default.
type PComp struct {
Value string `yaml:"value"`
}

// SetDefaults for the producer compression type configuration.
func (c *PComp) SetDefaults() {
c.Value = "snappy"
}
3 changes: 2 additions & 1 deletion frontend/src/components/pages/topics/Topic.Produce.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ const PublishTopicForm: FC<{ topicName: string }> = observer(({ topicName }) =>
} = useForm<Inputs>({
defaultValues: {
partition: -1,
compressionType: CompressionType.SNAPPY,
compressionType: api.producerCompressionType,
headers: [],
key: {
data: '',
Expand Down Expand Up @@ -583,6 +583,7 @@ export class TopicProducePage extends PageComponent<{ topicName: string }> {

refreshData(force?: boolean) {
api.refreshSchemaSubjects(force);
api.refreshProducerCompressionType(force);
}

render() {
Expand Down
28 changes: 28 additions & 0 deletions frontend/src/state/backendApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import {
PatchTopicConfigsRequest,
Payload,
ProduceRecordsResponse,
ProducerCompressionTypeResponse,
PublishRecordsRequest,
QuotaResponse,
ResourceConfig,
Expand Down Expand Up @@ -324,6 +325,8 @@ const apiStore = {
topicConsumers: new Map<string, TopicConsumer[]>(),
topicAcls: new Map<string, GetAclOverviewResponse | null>(),

producerCompressionType: undefined as ProtoCompressionType | undefined,

serviceAccounts: undefined as GetUsersResponse | undefined | null,
ACLs: undefined as GetAclOverviewResponse | undefined | null,

Expand Down Expand Up @@ -458,6 +461,31 @@ const apiStore = {
}, addError);
},

refreshProducerCompressionType(force?: boolean) {
cachedApiRequest<ProducerCompressionTypeResponse>(`${appConfig.restBasePath}/producer-compression-type`, force)
.then(r => {
if (r.producerCompressionType!=null && r.producerCompressionType.includes('snappy')) {
this.producerCompressionType = ProtoCompressionType.SNAPPY;
return
} if (r.producerCompressionType!=null && r.producerCompressionType?.includes('uncompressed')) {
this.producerCompressionType = ProtoCompressionType.UNCOMPRESSED;
return
} if (r.producerCompressionType!=null && r.producerCompressionType?.includes('gzip')) {
this.producerCompressionType = ProtoCompressionType.GZIP;
return
} if (r.producerCompressionType!=null && r.producerCompressionType?.includes('lz4')) {
this.producerCompressionType = ProtoCompressionType.LZ4;
return
} if (r.producerCompressionType!=null && r.producerCompressionType?.includes('zstd')) {
this.producerCompressionType = ProtoCompressionType.ZSTD;
return
} else {
this.producerCompressionType = ProtoCompressionType.SNAPPY //default to snappy
return
}
}, addError);
},

refreshTopicPermissions(topicName: string, force?: boolean) {
if (!AppFeatures.SINGLE_SIGN_ON) return; // without SSO there can't be a permissions endpoint
if (this.userData?.user?.providerID == -1) return; // debug user
Expand Down
5 changes: 4 additions & 1 deletion frontend/src/state/restInterfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,10 @@ export interface TopicDocumentationResponse {
documentation: TopicDocumentation;
}


// GET /producer-compression-type
export type ProducerCompressionTypeResponse = {
producerCompressionType: string | null;
};


export interface GroupMemberAssignment {
Expand Down

0 comments on commit 5f1e16a

Please sign in to comment.