Skip to content

Commit

Permalink
fix: make it possible to wire up the request wrapper and created obje…
Browse files Browse the repository at this point in the history
…ct store clients from enterprise logs (#12134)
  • Loading branch information
MasslessParticle authored Mar 6, 2024
1 parent 1556cc0 commit bf11a7e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
5 changes: 3 additions & 2 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,11 @@ type Loki struct {
bloomCompactorRingManager *lokiring.RingManager
bloomGatewayRingManager *lokiring.RingManager

clientMetrics storage.ClientMetrics
ClientMetrics storage.ClientMetrics
deleteClientMetrics *deletion.DeleteRequestClientMetrics

Tee distributor.Tee
PushParserWrapper push.RequestParserWrapper
HTTPAuthMiddleware middleware.Interface

Codec Codec
Expand All @@ -341,7 +342,7 @@ type Loki struct {
func New(cfg Config) (*Loki, error) {
loki := &Loki{
Cfg: cfg,
clientMetrics: storage.NewClientMetrics(),
ClientMetrics: storage.NewClientMetrics(),
deleteClientMetrics: deletion.NewDeleteRequestClientMetrics(prometheus.DefaultRegisterer),
Codec: queryrange.DefaultCodec,
}
Expand Down
22 changes: 13 additions & 9 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ func (t *Loki) initDistributor() (services.Service, error) {
return nil, err
}

if t.PushParserWrapper != nil {
t.distributor.RequestParserWrapper = t.PushParserWrapper
}

// Register the distributor to receive Push requests over GRPC
// EXCEPT when running with `-target=all` or `-target=` contains `ingester`
if !t.Cfg.isModuleEnabled(All) && !t.Cfg.isModuleEnabled(Write) && !t.Cfg.isModuleEnabled(Ingester) {
Expand Down Expand Up @@ -610,7 +614,7 @@ func (t *Loki) initTableManager() (services.Service, error) {

reg := prometheus.WrapRegistererWith(prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer)

tableClient, err := storage.NewTableClient(lastConfig.IndexType, *lastConfig, t.Cfg.StorageConfig, t.clientMetrics, reg, util_log.Logger)
tableClient, err := storage.NewTableClient(lastConfig.IndexType, *lastConfig, t.Cfg.StorageConfig, t.ClientMetrics, reg, util_log.Logger)
if err != nil {
return nil, err
}
Expand All @@ -636,7 +640,7 @@ func (t *Loki) initStore() (services.Service, error) {
}
}

store, err := storage.NewStore(t.Cfg.StorageConfig, t.Cfg.ChunkStoreConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, prometheus.DefaultRegisterer, util_log.Logger, t.Cfg.MetricsNamespace)
store, err := storage.NewStore(t.Cfg.StorageConfig, t.Cfg.ChunkStoreConfig, t.Cfg.SchemaConfig, t.Overrides, t.ClientMetrics, prometheus.DefaultRegisterer, util_log.Logger, t.Cfg.MetricsNamespace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -677,7 +681,7 @@ func (t *Loki) initBloomStore() (services.Service, error) {
level.Warn(logger).Log("msg", "failed to preload blocks cache", "err", err)
}

t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.clientMetrics, metasCache, blocksCache, reg, logger)
t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, metasCache, blocksCache, reg, logger)
if err != nil {
return nil, fmt.Errorf("failed to create bloom store: %w", err)
}
Expand Down Expand Up @@ -1091,7 +1095,7 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) {
}
}

t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, t.Cfg.StorageConfig.Hedging, t.clientMetrics, ruler.GroupLoader{}, util_log.Logger)
t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, t.Cfg.StorageConfig.Hedging, t.ClientMetrics, ruler.GroupLoader{}, util_log.Logger)

return
}
Expand Down Expand Up @@ -1265,7 +1269,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
continue
}

objectClient, err := storage.NewObjectClient(periodConfig.ObjectType, t.Cfg.StorageConfig, t.clientMetrics)
objectClient, err := storage.NewObjectClient(periodConfig.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics)
if err != nil {
return nil, fmt.Errorf("failed to create object client: %w", err)
}
Expand All @@ -1276,7 +1280,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
var deleteRequestStoreClient client.ObjectClient
if t.Cfg.CompactorConfig.RetentionEnabled {
if deleteStore := t.Cfg.CompactorConfig.DeleteRequestStore; deleteStore != "" {
if deleteRequestStoreClient, err = storage.NewObjectClient(deleteStore, t.Cfg.StorageConfig, t.clientMetrics); err != nil {
if deleteRequestStoreClient, err = storage.NewObjectClient(deleteStore, t.Cfg.StorageConfig, t.ClientMetrics); err != nil {
return nil, fmt.Errorf("failed to create delete request store object client: %w", err)
}
} else {
Expand Down Expand Up @@ -1366,7 +1370,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
}
tableRange := period.GetIndexTableNumberRange(periodEndTime)

indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, shardingStrategy,
indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.ClientMetrics, shardingStrategy,
prometheus.DefaultRegisterer, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())), t.Cfg.MetricsNamespace,
)
if err != nil {
Expand Down Expand Up @@ -1465,7 +1469,7 @@ func (t *Loki) initBloomCompactor() (services.Service, error) {
t.Cfg.BloomCompactor,
t.Cfg.SchemaConfig,
t.Cfg.StorageConfig,
t.clientMetrics,
t.ClientMetrics,
t.Store,
shuffleSharding,
t.Overrides,
Expand Down Expand Up @@ -1571,7 +1575,7 @@ func (t *Loki) initAnalytics() (services.Service, error) {
return nil, err
}

objectClient, err := storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig, t.clientMetrics)
objectClient, err := storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics)
if err != nil {
level.Info(util_log.Logger).Log("msg", "failed to initialize usage report", "err", err)
return nil, nil
Expand Down

0 comments on commit bf11a7e

Please sign in to comment.