Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
* [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374
* [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442
* [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359
Expand Down
17 changes: 17 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,23 @@ querier:
# Eval time threshold above which a timeout is classified as user error (4XX).
# CLI flag: -querier.timeout-classification-eval-threshold
[timeout_classification_eval_threshold: <duration> | default = 1m30s]

query_protection:
rejection:
threshold:
# EXPERIMENTAL: Max CPU utilization that this ingester can reach before
# rejecting new query request (across all tenants) in percentage,
# between 0 and 1. monitored_resources config must include the resource
# type. 0 to disable.
# CLI flag: -querier.query-protection.rejection.threshold.cpu-utilization
[cpu_utilization: <float> | default = 0]

# EXPERIMENTAL: Max heap utilization that this ingester can reach before
# rejecting new query request (across all tenants) in percentage,
# between 0 and 1. monitored_resources config must include the resource
# type. 0 to disable.
# CLI flag: -querier.query-protection.rejection.threshold.heap-utilization
[heap_utilization: <float> | default = 0]
```

### `blocks_storage_config`
Expand Down
17 changes: 17 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4989,6 +4989,23 @@ thanos_engine:
# Eval time threshold above which a timeout is classified as user error (4XX).
# CLI flag: -querier.timeout-classification-eval-threshold
[timeout_classification_eval_threshold: <duration> | default = 1m30s]

query_protection:
rejection:
threshold:
# EXPERIMENTAL: Max CPU utilization that this ingester can reach before
# rejecting new query request (across all tenants) in percentage, between
# 0 and 1. monitored_resources config must include the resource type. 0 to
# disable.
# CLI flag: -querier.query-protection.rejection.threshold.cpu-utilization
[cpu_utilization: <float> | default = 0]

# EXPERIMENTAL: Max heap utilization that this ingester can reach before
# rejecting new query request (across all tenants) in percentage, between
# 0 and 1. monitored_resources config must include the resource type. 0 to
# disable.
# CLI flag: -querier.query-protection.rejection.threshold.heap-utilization
[heap_utilization: <float> | default = 0]
```

### `query_frontend_config`
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.Distributor.Validate(c.LimitsConfig); err != nil {
return errors.Wrap(err, "invalid distributor config")
}
if err := c.Querier.Validate(); err != nil {
if err := c.Querier.Validate(c.ResourceMonitor.Resources); err != nil {
return errors.Wrap(err, "invalid querier config")
}
if c.Querier.TimeoutClassificationEnabled && !c.Frontend.Handler.QueryStatsEnabled {
Expand Down
6 changes: 3 additions & 3 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) {
querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer)

// Create a querier queryable and PromQL engine
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.OverridesConfig.QueryPartialData)
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.OverridesConfig.QueryPartialData, t.ResourceMonitor)

// Use distributor as default MetadataQuerier
t.MetadataQuerier = t.Distributor
Expand Down Expand Up @@ -701,7 +701,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
queryEngine = engine.New(opts, t.Cfg.Ruler.ThanosEngine, rulerRegisterer)
} else {
// TODO: Consider wrapping logger to differentiate from querier module logger
queryable, _, queryEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.OverridesConfig.RulesPartialData)
queryable, _, queryEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.OverridesConfig.RulesPartialData, nil)
}

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, pusher, queryable, queryEngine, t.OverridesConfig, metrics, prometheus.DefaultRegisterer)
Expand Down Expand Up @@ -949,7 +949,7 @@ func (t *Cortex) setupModuleManager() error {
Ingester: {IngesterService, OverridesConfig, API},
IngesterService: {OverridesConfig, RuntimeConfig, MemberlistKV, ResourceMonitor},
Flusher: {OverridesConfig, API},
Queryable: {OverridesConfig, DistributorService, OverridesConfig, Ring, API, StoreQueryable, MemberlistKV},
Queryable: {OverridesConfig, DistributorService, OverridesConfig, Ring, API, StoreQueryable, MemberlistKV, ResourceMonitor},
Querier: {TenantFederation},
StoreQueryable: {OverridesConfig, OverridesConfig, MemberlistKV, GrpcClientService},
QueryFrontendTripperware: {API, OverridesConfig},
Expand Down
69 changes: 65 additions & 4 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/strutil"
"golang.org/x/sync/errgroup"

"github.com/cortexproject/cortex/pkg/configs"
"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/querier/batch"
"github.com/cortexproject/cortex/pkg/querier/lazyquery"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/parquetutil"
"github.com/cortexproject/cortex/pkg/util/resource"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/cortexproject/cortex/pkg/util/users"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand Down Expand Up @@ -103,6 +105,9 @@ type Config struct {
TimeoutClassificationEnabled bool `yaml:"timeout_classification_enabled"`
TimeoutClassificationDeadline time.Duration `yaml:"timeout_classification_deadline"`
TimeoutClassificationEvalThreshold time.Duration `yaml:"timeout_classification_eval_threshold"`

// Query protection: resource-based rejection.
QueryProtection configs.QueryProtection `yaml:"query_protection"`
}

var (
Expand Down Expand Up @@ -161,10 +166,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.TimeoutClassificationEnabled, "querier.timeout-classification-enabled", false, "If true, classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing.")
f.DurationVar(&cfg.TimeoutClassificationDeadline, "querier.timeout-classification-deadline", time.Minute+59*time.Second, "The total time before the querier proactively cancels a query for timeout classification. Set this a few seconds less than the querier timeout.")
f.DurationVar(&cfg.TimeoutClassificationEvalThreshold, "querier.timeout-classification-eval-threshold", time.Minute+30*time.Second, "Eval time threshold above which a timeout is classified as user error (4XX).")
cfg.QueryProtection.RegisterFlagsWithPrefix(f, "querier.")
}

// Validate the config
func (cfg *Config) Validate() error {
func (cfg *Config) Validate(monitoredResources flagext.StringSliceCSV) error {

if cfg.ResponseCompression != "" && cfg.ResponseCompression != "gzip" && cfg.ResponseCompression != "snappy" && cfg.ResponseCompression != "zstd" {
return errUnsupportedResponseCompression
Expand Down Expand Up @@ -207,6 +213,10 @@ func (cfg *Config) Validate() error {
return err
}

if err := cfg.QueryProtection.Validate(monitoredResources); err != nil {
return err
}

return nil
}

Expand All @@ -223,9 +233,28 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc {
}

// New builds a queryable and promql engine.
func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) {
func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc, resourceMonitor resource.IMonitor) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) {
iteratorFunc := getChunksIteratorFunction(cfg)

// Create resource-based limiter if resource monitor is available and thresholds are configured.
var resourceBasedLimiter *limiter.ResourceBasedLimiter
if resourceMonitor != nil {
resourceLimits := make(map[resource.Type]float64)
if cfg.QueryProtection.Rejection.Threshold.CPUUtilization > 0 {
resourceLimits[resource.CPU] = cfg.QueryProtection.Rejection.Threshold.CPUUtilization
}
if cfg.QueryProtection.Rejection.Threshold.HeapUtilization > 0 {
resourceLimits[resource.Heap] = cfg.QueryProtection.Rejection.Threshold.HeapUtilization
}
if len(resourceLimits) > 0 {
var err error
resourceBasedLimiter, err = limiter.NewResourceBasedLimiter(resourceMonitor, resourceLimits, reg, "querier")
if err != nil {
level.Error(logger).Log("msg", "failed to create resource based limiter for querier", "err", err)
}
}
}

distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts, limits, nil)

ns := make([]QueryableWithFilter, len(stores))
Expand All @@ -235,7 +264,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
limits: limits,
}
}
queryable := NewQueryable(distributorQueryable, ns, cfg, limits)
queryable := NewQueryable(distributorQueryable, ns, cfg, limits, resourceBasedLimiter, logger)
exemplarQueryable := newDistributorExemplarQueryable(distributor)

lazyQueryable := storage.QueryableFunc(func(mint int64, maxt int64) (storage.Querier, error) {
Expand Down Expand Up @@ -311,7 +340,7 @@ type limiterHolder struct {
}

// NewQueryable creates a new Queryable for cortex.
func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, cfg Config, limits *validation.Overrides) storage.Queryable {
func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, cfg Config, limits *validation.Overrides, resourceBasedLimiter *limiter.ResourceBasedLimiter, logger log.Logger) storage.Queryable {
return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
q := querier{
now: time.Now(),
Expand All @@ -324,6 +353,8 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter,
distributor: distributor,
stores: stores,
limiterHolder: &limiterHolder{},
resourceBasedLimiter: resourceBasedLimiter,
logger: logger,
}

return q, nil
Expand All @@ -339,6 +370,8 @@ type querier struct {
distributor QueryableWithFilter
stores []QueryableWithFilter
limiterHolder *limiterHolder
resourceBasedLimiter *limiter.ResourceBasedLimiter
logger log.Logger

ignoreMaxQueryLength bool
}
Expand Down Expand Up @@ -390,6 +423,11 @@ func (q querier) setupFromCtx(ctx context.Context) (context.Context, *querier_st
// Select implements storage.Querier interface.
// The bool passed is ignored because the series is always sorted.
func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
// Check resource utilization before processing the query.
if err := q.checkResourceUtilization(); err != nil {
return storage.ErrSeriesSet(err)
}

ctx, stats, userID, mint, maxt, metadataQuerier, queriers, err := q.setupFromCtx(ctx)
if err == errEmptyTimeRange {
return storage.EmptySeriesSet()
Expand Down Expand Up @@ -490,6 +528,11 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select

// LabelValues implements storage.Querier.
func (q querier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
// Check resource utilization before processing the query.
if err := q.checkResourceUtilization(); err != nil {
return nil, nil, err
}

ctx, stats, userID, mint, maxt, metadataQuerier, queriers, err := q.setupFromCtx(ctx)
if err == errEmptyTimeRange {
return nil, nil, nil
Expand Down Expand Up @@ -558,6 +601,11 @@ func (q querier) LabelValues(ctx context.Context, name string, hints *storage.La
}

func (q querier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
// Check resource utilization before processing the query.
if err := q.checkResourceUtilization(); err != nil {
return nil, nil, err
}

ctx, stats, userID, mint, maxt, metadataQuerier, queriers, err := q.setupFromCtx(ctx)
if err == errEmptyTimeRange {
return nil, nil, nil
Expand Down Expand Up @@ -625,6 +673,19 @@ func (q querier) LabelNames(ctx context.Context, hints *storage.LabelHints, matc
return strutil.MergeSlices(limit, sets...), warnings, nil
}

func (q querier) checkResourceUtilization() error {
if q.resourceBasedLimiter == nil {
return nil
}

if err := q.resourceBasedLimiter.AcceptNewRequest(); err != nil {
level.Warn(q.logger).Log("msg", "querier failed to accept request due to resource utilization", "err", err)
return limiter.ErrResourceLimitReached
}

return nil
}

func (querier) Close() error {
return nil
}
Expand Down
Loading
Loading