diff --git a/.chloggen/deprecate_default_fetch_size.yaml b/.chloggen/deprecate_default_fetch_size.yaml new file mode 100644 index 0000000000000..03bed1b6915dd --- /dev/null +++ b/.chloggen/deprecate_default_fetch_size.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/kafka + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`)`. +note: "Deprecate `default_fetch_size` parameter for franz-go client" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [43104] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The `default_fetch_size` parameter is now deprecated for the franz-go Kafka client and will only be used with the legacy Sarama client. + Users should configure `max_fetch_size` instead when using franz-go. + This deprecation is marked as of v0.142.0. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/fix_43104.yaml b/.chloggen/fix_43104.yaml new file mode 100644 index 0000000000000..ba4679dd9b9b6 --- /dev/null +++ b/.chloggen/fix_43104.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/kafka + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Use `max_fetch_size` instead of `default_fetch_size` in franz-go client" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [43104] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The franz-go Kafka consumer was incorrectly using `default_fetch_size` (a Sarama-specific setting) instead of `max_fetch_size` when configuring `kgo.FetchMaxBytes`. + This fix ensures the correct parameter is used and adds validation to prevent `max_fetch_size` from being less than `min_fetch_size`. + The default value for `max_fetch_size` has been changed from 0 (unlimited) to 1048576 (1 MiB) to maintain backward compatibility with the previous (incorrect) behavior. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/kafka/franz_client.go b/internal/kafka/franz_client.go index fc7cd48486a74..b0bd79737c08b 100644 --- a/internal/kafka/franz_client.go +++ b/internal/kafka/franz_client.go @@ -104,6 +104,8 @@ func NewFranzConsumerGroup(ctx context.Context, clientCfg configkafka.ClientConf opts, err := commonOpts(ctx, clientCfg, logger, append([]kgo.Opt{ kgo.ConsumeTopics(topics...), kgo.ConsumerGroup(consumerCfg.GroupID), + kgo.FetchMinBytes(consumerCfg.MinFetchSize), + kgo.FetchMaxBytes(consumerCfg.MaxFetchSize), }, opts...)...) if err != nil { return nil, err @@ -136,13 +138,7 @@ func NewFranzConsumerGroup(ctx context.Context, clientCfg configkafka.ClientConf opts = append(opts, kgo.HeartbeatInterval(consumerCfg.HeartbeatInterval)) } - // Configure fetch sizes - if consumerCfg.MinFetchSize > 0 { - opts = append(opts, kgo.FetchMinBytes(consumerCfg.MinFetchSize)) - } - if consumerCfg.DefaultFetchSize > 0 { - opts = append(opts, kgo.FetchMaxBytes(consumerCfg.DefaultFetchSize)) - } + // Configure per-partition fetch size if consumerCfg.MaxPartitionFetchSize > 0 { opts = append(opts, kgo.FetchMaxPartitionBytes(consumerCfg.MaxPartitionFetchSize)) } diff --git a/pkg/kafka/configkafka/config.go b/pkg/kafka/configkafka/config.go index 16ae596b7018b..8a5ed599f7193 100644 --- a/pkg/kafka/configkafka/config.go +++ b/pkg/kafka/configkafka/config.go @@ -110,9 +110,10 @@ type ConsumerConfig struct { MinFetchSize int32 `mapstructure:"min_fetch_size"` // The default bytes per fetch from Kafka (default "1048576") + // Only used with Sarama client. Use MaxFetchSize for franz-go. DefaultFetchSize int32 `mapstructure:"default_fetch_size"` - // The maximum bytes per fetch from Kafka (default "0", no limit) + // The maximum bytes per fetch from Kafka (default "1048576") MaxFetchSize int32 `mapstructure:"max_fetch_size"` // The maximum amount of time to wait for MinFetchSize bytes to be @@ -145,7 +146,7 @@ func NewDefaultConsumerConfig() ConsumerConfig { Interval: time.Second, }, MinFetchSize: 1, - MaxFetchSize: 0, + MaxFetchSize: 1048576, MaxFetchWait: 250 * time.Millisecond, DefaultFetchSize: 1048576, MaxPartitionFetchSize: 1048576, @@ -174,6 +175,22 @@ func (c ConsumerConfig) Validate() error { ) } } + + // Validate fetch size constraints + if c.MinFetchSize < 0 { + return fmt.Errorf("min_fetch_size (%d) must be non-negative", c.MinFetchSize) + } + if c.MaxFetchSize < 0 { + return fmt.Errorf("max_fetch_size (%d) must be non-negative", c.MaxFetchSize) + } + if c.MaxFetchSize < c.MinFetchSize { + return fmt.Errorf( + "max_fetch_size (%d) cannot be less than min_fetch_size (%d)", + c.MaxFetchSize, + c.MinFetchSize, + ) + } + return nil } diff --git a/pkg/kafka/configkafka/config_test.go b/pkg/kafka/configkafka/config_test.go index 3bee81fe36e8e..3149f73a431ff 100644 --- a/pkg/kafka/configkafka/config_test.go +++ b/pkg/kafka/configkafka/config_test.go @@ -167,11 +167,34 @@ func TestConsumerConfig(t *testing.T) { MaxPartitionFetchSize: 4096, }, }, + "zero_min_fetch_size": { + expected: ConsumerConfig{ + SessionTimeout: 10 * time.Second, + HeartbeatInterval: 3 * time.Second, + GroupID: "otel-collector", + InitialOffset: "latest", + AutoCommit: AutoCommitConfig{ + Enable: true, + Interval: 1 * time.Second, + }, + MinFetchSize: 0, + DefaultFetchSize: 1048576, + MaxFetchSize: 1048576, + MaxFetchWait: 250 * time.Millisecond, + MaxPartitionFetchSize: 1048576, + }, + }, // Invalid configurations "invalid_initial_offset": { expectedErr: "initial_offset should be one of 'latest' or 'earliest'. configured value middle", }, + "invalid_fetch_size": { + expectedErr: "max_fetch_size (100) cannot be less than min_fetch_size (1000)", + }, + "negative_min_fetch_size": { + expectedErr: "min_fetch_size (-100) must be non-negative", + }, }) } diff --git a/pkg/kafka/configkafka/testdata/consumer_config.yaml b/pkg/kafka/configkafka/testdata/consumer_config.yaml index dc2a9d85127ab..938f02d15cafe 100644 --- a/pkg/kafka/configkafka/testdata/consumer_config.yaml +++ b/pkg/kafka/configkafka/testdata/consumer_config.yaml @@ -12,7 +12,16 @@ kafka/full: max_fetch_size: 4096 max_fetch_wait: 1s max_partition_fetch_size: 4096 +kafka/zero_min_fetch_size: + min_fetch_size: 0 + max_fetch_size: 1048576 # Invalid configurations kafka/invalid_initial_offset: initial_offset: middle +kafka/invalid_fetch_size: + min_fetch_size: 1000 + max_fetch_size: 100 +kafka/negative_min_fetch_size: + min_fetch_size: -100 + max_fetch_size: 1048576 diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 668ed2d117037..23cf34ee1f3cd 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -90,8 +90,8 @@ The following settings can be optionally configured: - If set to an empty string (or not set), the consumer is treated as a dynamic member. In this case, the consumer's partition assignments may change during rebalances. - Using a `group_instance_id` is useful for stateful consumers or when you need to ensure that a specific consumer instance is always assigned the same set of partitions. - `min_fetch_size` (default = `1`): The minimum number of message bytes to fetch in a request, defaults to 1 byte. -- `default_fetch_size` (default = `1048576`): The default number of message bytes to fetch in a request, defaults to 1MB. -- `max_fetch_size` (default = `0`): The maximum number of message bytes to fetch in a request, defaults to unlimited. +- `default_fetch_size` (default = `1048576`): (Deprecated [v0.142.0]: only used with legacy Sarama client. Use `max_fetch_size` instead) The default number of message bytes to fetch in a request, defaults to 1MB. +- `max_fetch_size` (default = `1048576`): The maximum number of message bytes to fetch in a request, defaults to 1MB. Must be greater than or equal to `min_fetch_size`. - `max_fetch_wait` (default = `250ms`): The maximum amount of time the broker should wait for `min_fetch_size` bytes to be available before returning anyway. - `max_partition_fetch_size` (default = `1048576`): The default number of message bytes to fetch in a request per partition, defaults to 1MB. If a single record batch is larger than this value, the broker will still return it to ensure the consumer can make progress. This setting only applies while using [`franz-go`](https://github.com/twmb/franz-go). - `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) for the full set of available options.