Skip to content

Commit 08a4be9

Browse files
authored
[receiver/kafka] Use max_fetch_size instead of default_fetch_size for franz-go consumer (#44748)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The franz-go Kafka consumer was using `default_fetch_size` (a Sarama-specific setting) instead of `max_fetch_size` when configuring `kgo.FetchMaxBytes`. This fix ensures the parameter `max_fetch_size` is used and adds validation to prevent it from being less than `min_fetch_size`. The default value for `max_fetch_size` has been changed from 0 (unlimited) to 1048576 (1 MiB) to maintain backward compatibility with the previous behavior. The default for franz-go is [50MiB](https://github.com/twmb/franz-go/blob/master/pkg/kgo/config.go#L1313-L1328). The `default_fetch_size` parameter is now marked as deprecated for franz-go and will only be used with the legacy Sarama client. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #43104 <!--Describe what testing was performed and which tests were added.--> #### Testing Added tests to ensure `max_fetch_size` is equal to or higher than `min_fetch_size`. <!--Describe the documentation added.--> #### Documentation Updated documentation referring to the `default_fetch_size` is deprecated. <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: Paulo Dias <[email protected]>
1 parent b716917 commit 08a4be9

File tree

7 files changed

+116
-11
lines changed

7 files changed

+116
-11
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: receiver/kafka
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`)`.
10+
note: "Deprecate `default_fetch_size` parameter for franz-go client"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [43104]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The `default_fetch_size` parameter is now deprecated for the franz-go Kafka client and will only be used with the legacy Sarama client.
20+
Users should configure `max_fetch_size` instead when using franz-go.
21+
This deprecation is marked as of v0.142.0.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: [user]

.chloggen/fix_43104.yaml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: receiver/kafka
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Use `max_fetch_size` instead of `default_fetch_size` in franz-go client"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [43104]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The franz-go Kafka consumer was incorrectly using `default_fetch_size` (a Sarama-specific setting) instead of `max_fetch_size` when configuring `kgo.FetchMaxBytes`.
20+
This fix ensures the correct parameter is used and adds validation to prevent `max_fetch_size` from being less than `min_fetch_size`.
21+
The default value for `max_fetch_size` has been changed from 0 (unlimited) to 1048576 (1 MiB) to maintain backward compatibility with the previous (incorrect) behavior.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: [user]

internal/kafka/franz_client.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ func NewFranzConsumerGroup(ctx context.Context, clientCfg configkafka.ClientConf
104104
opts, err := commonOpts(ctx, clientCfg, logger, append([]kgo.Opt{
105105
kgo.ConsumeTopics(topics...),
106106
kgo.ConsumerGroup(consumerCfg.GroupID),
107+
kgo.FetchMinBytes(consumerCfg.MinFetchSize),
108+
kgo.FetchMaxBytes(consumerCfg.MaxFetchSize),
107109
}, opts...)...)
108110
if err != nil {
109111
return nil, err
@@ -136,13 +138,7 @@ func NewFranzConsumerGroup(ctx context.Context, clientCfg configkafka.ClientConf
136138
opts = append(opts, kgo.HeartbeatInterval(consumerCfg.HeartbeatInterval))
137139
}
138140

139-
// Configure fetch sizes
140-
if consumerCfg.MinFetchSize > 0 {
141-
opts = append(opts, kgo.FetchMinBytes(consumerCfg.MinFetchSize))
142-
}
143-
if consumerCfg.DefaultFetchSize > 0 {
144-
opts = append(opts, kgo.FetchMaxBytes(consumerCfg.DefaultFetchSize))
145-
}
141+
// Configure per-partition fetch size
146142
if consumerCfg.MaxPartitionFetchSize > 0 {
147143
opts = append(opts, kgo.FetchMaxPartitionBytes(consumerCfg.MaxPartitionFetchSize))
148144
}

pkg/kafka/configkafka/config.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,10 @@ type ConsumerConfig struct {
110110
MinFetchSize int32 `mapstructure:"min_fetch_size"`
111111

112112
// The default bytes per fetch from Kafka (default "1048576")
113+
// Only used with Sarama client. Use MaxFetchSize for franz-go.
113114
DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
114115

115-
// The maximum bytes per fetch from Kafka (default "0", no limit)
116+
// The maximum bytes per fetch from Kafka (default "1048576")
116117
MaxFetchSize int32 `mapstructure:"max_fetch_size"`
117118

118119
// The maximum amount of time to wait for MinFetchSize bytes to be
@@ -145,7 +146,7 @@ func NewDefaultConsumerConfig() ConsumerConfig {
145146
Interval: time.Second,
146147
},
147148
MinFetchSize: 1,
148-
MaxFetchSize: 0,
149+
MaxFetchSize: 1048576,
149150
MaxFetchWait: 250 * time.Millisecond,
150151
DefaultFetchSize: 1048576,
151152
MaxPartitionFetchSize: 1048576,
@@ -174,6 +175,22 @@ func (c ConsumerConfig) Validate() error {
174175
)
175176
}
176177
}
178+
179+
// Validate fetch size constraints
180+
if c.MinFetchSize < 0 {
181+
return fmt.Errorf("min_fetch_size (%d) must be non-negative", c.MinFetchSize)
182+
}
183+
if c.MaxFetchSize < 0 {
184+
return fmt.Errorf("max_fetch_size (%d) must be non-negative", c.MaxFetchSize)
185+
}
186+
if c.MaxFetchSize < c.MinFetchSize {
187+
return fmt.Errorf(
188+
"max_fetch_size (%d) cannot be less than min_fetch_size (%d)",
189+
c.MaxFetchSize,
190+
c.MinFetchSize,
191+
)
192+
}
193+
177194
return nil
178195
}
179196

pkg/kafka/configkafka/config_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,34 @@ func TestConsumerConfig(t *testing.T) {
167167
MaxPartitionFetchSize: 4096,
168168
},
169169
},
170+
"zero_min_fetch_size": {
171+
expected: ConsumerConfig{
172+
SessionTimeout: 10 * time.Second,
173+
HeartbeatInterval: 3 * time.Second,
174+
GroupID: "otel-collector",
175+
InitialOffset: "latest",
176+
AutoCommit: AutoCommitConfig{
177+
Enable: true,
178+
Interval: 1 * time.Second,
179+
},
180+
MinFetchSize: 0,
181+
DefaultFetchSize: 1048576,
182+
MaxFetchSize: 1048576,
183+
MaxFetchWait: 250 * time.Millisecond,
184+
MaxPartitionFetchSize: 1048576,
185+
},
186+
},
170187

171188
// Invalid configurations
172189
"invalid_initial_offset": {
173190
expectedErr: "initial_offset should be one of 'latest' or 'earliest'. configured value middle",
174191
},
192+
"invalid_fetch_size": {
193+
expectedErr: "max_fetch_size (100) cannot be less than min_fetch_size (1000)",
194+
},
195+
"negative_min_fetch_size": {
196+
expectedErr: "min_fetch_size (-100) must be non-negative",
197+
},
175198
})
176199
}
177200

pkg/kafka/configkafka/testdata/consumer_config.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,16 @@ kafka/full:
1212
max_fetch_size: 4096
1313
max_fetch_wait: 1s
1414
max_partition_fetch_size: 4096
15+
kafka/zero_min_fetch_size:
16+
min_fetch_size: 0
17+
max_fetch_size: 1048576
1518

1619
# Invalid configurations
1720
kafka/invalid_initial_offset:
1821
initial_offset: middle
22+
kafka/invalid_fetch_size:
23+
min_fetch_size: 1000
24+
max_fetch_size: 100
25+
kafka/negative_min_fetch_size:
26+
min_fetch_size: -100
27+
max_fetch_size: 1048576

receiver/kafkareceiver/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ The following settings can be optionally configured:
9090
- If set to an empty string (or not set), the consumer is treated as a dynamic member. In this case, the consumer's partition assignments may change during rebalances.
9191
- Using a `group_instance_id` is useful for stateful consumers or when you need to ensure that a specific consumer instance is always assigned the same set of partitions.
9292
- `min_fetch_size` (default = `1`): The minimum number of message bytes to fetch in a request, defaults to 1 byte.
93-
- `default_fetch_size` (default = `1048576`): The default number of message bytes to fetch in a request, defaults to 1MB.
94-
- `max_fetch_size` (default = `0`): The maximum number of message bytes to fetch in a request, defaults to unlimited.
93+
- `default_fetch_size` (default = `1048576`): (Deprecated [v0.142.0]: only used with legacy Sarama client. Use `max_fetch_size` instead) The default number of message bytes to fetch in a request, defaults to 1MB.
94+
- `max_fetch_size` (default = `1048576`): The maximum number of message bytes to fetch in a request, defaults to 1MB. Must be greater than or equal to `min_fetch_size`.
9595
- `max_fetch_wait` (default = `250ms`): The maximum amount of time the broker should wait for `min_fetch_size` bytes to be available before returning anyway.
9696
- `max_partition_fetch_size` (default = `1048576`): The default number of message bytes to fetch in a request per partition, defaults to 1MB. If a single record batch is larger than this value, the broker will still return it to ensure the consumer can make progress. This setting only applies while using [`franz-go`](https://github.com/twmb/franz-go).
9797
- `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) for the full set of available options.

0 commit comments

Comments
 (0)