confluent-kafka-dotnet
Show / Hide Table of Contents

Class ConsumerConfig

Consumer configuration properties

Inheritance
object
Config
ClientConfig
ConsumerConfig
Implements
IEnumerable<KeyValuePair<string, string>>
IEnumerable
Inherited Members
ClientConfig.SaslMechanism
ClientConfig.Acks
ClientConfig.ClientId
ClientConfig.BootstrapServers
ClientConfig.MessageMaxBytes
ClientConfig.MessageCopyMaxBytes
ClientConfig.ReceiveMessageMaxBytes
ClientConfig.MaxInFlight
ClientConfig.TopicMetadataRefreshIntervalMs
ClientConfig.MetadataMaxAgeMs
ClientConfig.TopicMetadataRefreshFastIntervalMs
ClientConfig.TopicMetadataRefreshSparse
ClientConfig.TopicMetadataPropagationMaxMs
ClientConfig.TopicBlacklist
ClientConfig.Debug
ClientConfig.SocketTimeoutMs
ClientConfig.SocketSendBufferBytes
ClientConfig.SocketReceiveBufferBytes
ClientConfig.SocketKeepaliveEnable
ClientConfig.SocketNagleDisable
ClientConfig.SocketMaxFails
ClientConfig.BrokerAddressTtl
ClientConfig.BrokerAddressFamily
ClientConfig.SocketConnectionSetupTimeoutMs
ClientConfig.ConnectionsMaxIdleMs
ClientConfig.ReconnectBackoffMs
ClientConfig.ReconnectBackoffMaxMs
ClientConfig.StatisticsIntervalMs
ClientConfig.LogQueue
ClientConfig.LogThreadName
ClientConfig.EnableRandomSeed
ClientConfig.LogConnectionClose
ClientConfig.InternalTerminationSignal
ClientConfig.ApiVersionRequest
ClientConfig.ApiVersionRequestTimeoutMs
ClientConfig.ApiVersionFallbackMs
ClientConfig.BrokerVersionFallback
ClientConfig.AllowAutoCreateTopics
ClientConfig.SecurityProtocol
ClientConfig.SslCipherSuites
ClientConfig.SslCurvesList
ClientConfig.SslSigalgsList
ClientConfig.SslKeyLocation
ClientConfig.SslKeyPassword
ClientConfig.SslKeyPem
ClientConfig.SslCertificateLocation
ClientConfig.SslCertificatePem
ClientConfig.SslCaLocation
ClientConfig.SslCaPem
ClientConfig.SslCaCertificateStores
ClientConfig.SslCrlLocation
ClientConfig.SslKeystoreLocation
ClientConfig.SslKeystorePassword
ClientConfig.SslProviders
ClientConfig.SslEngineLocation
ClientConfig.SslEngineId
ClientConfig.EnableSslCertificateVerification
ClientConfig.SslEndpointIdentificationAlgorithm
ClientConfig.SaslKerberosServiceName
ClientConfig.SaslKerberosPrincipal
ClientConfig.SaslKerberosKinitCmd
ClientConfig.SaslKerberosKeytab
ClientConfig.SaslKerberosMinTimeBeforeRelogin
ClientConfig.SaslUsername
ClientConfig.SaslPassword
ClientConfig.SaslOauthbearerConfig
ClientConfig.EnableSaslOauthbearerUnsecureJwt
ClientConfig.SaslOauthbearerMethod
ClientConfig.SaslOauthbearerClientId
ClientConfig.SaslOauthbearerClientSecret
ClientConfig.SaslOauthbearerScope
ClientConfig.SaslOauthbearerExtensions
ClientConfig.SaslOauthbearerTokenEndpointUrl
ClientConfig.PluginLibraryPaths
ClientConfig.ClientRack
ClientConfig.RetryBackoffMs
ClientConfig.RetryBackoffMaxMs
ClientConfig.ClientDnsLookup
ClientConfig.EnableMetricsPush
Config.Set(string, string)
Config.Get(string)
Config.GetInt(string)
Config.GetBool(string)
Config.GetDouble(string)
Config.GetEnum(Type, string)
Config.SetObject(string, object)
Config.properties
Config.GetEnumerator()
Config.CancellationDelayMaxMs
object.Equals(object)
object.Equals(object, object)
object.GetHashCode()
object.GetType()
object.MemberwiseClone()
object.ReferenceEquals(object, object)
object.ToString()
Namespace: Confluent.Kafka
Assembly: Confluent.Kafka.dll
Syntax
public class ConsumerConfig : ClientConfig, IEnumerable<KeyValuePair<string, string>>, IEnumerable

Constructors

ConsumerConfig()

Initialize a new empty ConsumerConfig instance.

Declaration
public ConsumerConfig()

ConsumerConfig(ClientConfig)

Initialize a new ConsumerConfig instance wrapping an existing ClientConfig instance. This will change the values "in-place" i.e. operations on this class WILL modify the provided collection

Declaration
public ConsumerConfig(ClientConfig config)
Parameters
Type Name Description
ClientConfig config

ConsumerConfig(IDictionary<string, string>)

Initialize a new ConsumerConfig instance wrapping an existing key/value pair collection. This will change the values "in-place" i.e. operations on this class WILL modify the provided collection

Declaration
public ConsumerConfig(IDictionary<string, string> config)
Parameters
Type Name Description
IDictionary<string, string> config

Properties

AutoCommitIntervalMs

The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. (0 = disable). This setting is used by the high-level consumer.

default: 5000 importance: medium

Declaration
public int? AutoCommitIntervalMs { get; set; }
Property Value
Type Description
int?

AutoOffsetReset

Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.

default: largest importance: high

Declaration
public AutoOffsetReset? AutoOffsetReset { get; set; }
Property Value
Type Description
AutoOffsetReset?

CheckCrcs

Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.

default: false importance: medium

Declaration
public bool? CheckCrcs { get; set; }
Property Value
Type Description
bool?

ConsumeResultFields

A comma separated list of fields that may be optionally set in ConsumeResult<TKey, TValue> objects returned by the Consume(TimeSpan) method. Disabling fields that you do not require will improve throughput and reduce memory consumption. Allowed values: headers, timestamp, topic, all, none

default: all importance: low

Declaration
public string ConsumeResultFields { set; }
Property Value
Type Description
string

CoordinatorQueryIntervalMs

How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.

default: 600000 importance: low

Declaration
public int? CoordinatorQueryIntervalMs { get; set; }
Property Value
Type Description
int?

EnableAutoCommit

Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign().

default: true importance: high

Declaration
public bool? EnableAutoCommit { get; set; }
Property Value
Type Description
bool?

EnableAutoOffsetStore

Automatically store offset of last message provided to application. The offset store is an in-memory store of the next offset to (auto-)commit for each partition.

default: true importance: high

Declaration
public bool? EnableAutoOffsetStore { get; set; }
Property Value
Type Description
bool?

EnablePartitionEof

Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition.

default: false importance: low

Declaration
public bool? EnablePartitionEof { get; set; }
Property Value
Type Description
bool?

FetchErrorBackoffMs

How long to postpone the next fetch request for a topic+partition in case of a fetch error.

default: 500 importance: medium

Declaration
public int? FetchErrorBackoffMs { get; set; }
Property Value
Type Description
int?

FetchMaxBytes

Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the Fetch request is larger than this value, then the message batch will still be returned to ensure the consumer can make progress. The maximum message batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (broker topic config). fetch.max.bytes is automatically adjusted upwards to be at least message.max.bytes (consumer config).

default: 52428800 importance: medium

Declaration
public int? FetchMaxBytes { get; set; }
Property Value
Type Description
int?

FetchMinBytes

Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires the accumulated data will be sent to the client regardless of this setting.

default: 1 importance: low

Declaration
public int? FetchMinBytes { get; set; }
Property Value
Type Description
int?

FetchQueueBackoffMs

How long to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds (queued.min.messages or queued.max.messages.kbytes) have been exceded. This property may need to be decreased if the queue thresholds are set low and the application is experiencing long (~1s) delays between messages. Low values may increase CPU utilization.

default: 1000 importance: medium

Declaration
public int? FetchQueueBackoffMs { get; set; }
Property Value
Type Description
int?

FetchWaitMaxMs

Maximum time the broker may wait to fill the Fetch response with fetch.min.bytes of messages.

default: 500 importance: low

Declaration
public int? FetchWaitMaxMs { get; set; }
Property Value
Type Description
int?

GroupId

Client group id string. All clients sharing the same group.id belong to the same group.

default: '' importance: high

Declaration
public string GroupId { get; set; }
Property Value
Type Description
string

GroupInstanceId

Enable static group membership. Static group members are able to leave and rejoin a group within the configured session.timeout.ms without prompting a group rebalance. This should be used in combination with a larger session.timeout.ms to avoid group rebalances caused by transient unavailability (e.g. process restarts). Requires broker version >= 2.3.0.

default: '' importance: medium

Declaration
public string GroupInstanceId { get; set; }
Property Value
Type Description
string

GroupProtocol

Group protocol to use. Use classic for the original protocol and consumer for the new protocol introduced in KIP-848. Available protocols: classic or consumer. Default is classic, but will change to consumer in next releases.

default: classic importance: high

Declaration
public GroupProtocol? GroupProtocol { get; set; }
Property Value
Type Description
GroupProtocol?

GroupProtocolType

Group protocol type for the classic group protocol. NOTE: Currently, the only supported group protocol type is consumer.

default: consumer importance: low

Declaration
public string GroupProtocolType { get; set; }
Property Value
Type Description
string

GroupRemoteAssignor

Server side assignor to use. Keep it null to make server select a suitable assignor for the group. Available assignors: uniform or range. Default is null

default: '' importance: medium

Declaration
public string GroupRemoteAssignor { get; set; }
Property Value
Type Description
string

HeartbeatIntervalMs

Group session keepalive heartbeat interval.

default: 3000 importance: low

Declaration
public int? HeartbeatIntervalMs { get; set; }
Property Value
Type Description
int?

IsolationLevel

Controls how to read messages written transactionally: read_committed - only return transactional messages which have been committed. read_uncommitted - return all messages, even transactional messages which have been aborted.

default: read_committed importance: high

Declaration
public IsolationLevel? IsolationLevel { get; set; }
Property Value
Type Description
IsolationLevel?

MaxPartitionFetchBytes

Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.

default: 1048576 importance: medium

Declaration
public int? MaxPartitionFetchBytes { get; set; }
Property Value
Type Description
int?

MaxPollIntervalMs

Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set enable.auto.offset.store=false for long-time processing applications and then explicitly store offsets (using offsets_store()) after message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. See KIP-62 for more information.

default: 300000 importance: high

Declaration
public int? MaxPollIntervalMs { get; set; }
Property Value
Type Description
int?

PartitionAssignmentStrategy

The name of one or more partition assignment strategies. The elected group leader will use a strategy supported by all members of the group to assign partitions to group members. If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority). Cooperative and non-cooperative (eager) strategies must not be mixed. Available strategies: range, roundrobin, cooperative-sticky.

default: range,roundrobin importance: medium

Declaration
public PartitionAssignmentStrategy? PartitionAssignmentStrategy { get; set; }
Property Value
Type Description
PartitionAssignmentStrategy?

QueuedMaxMessagesKbytes

Maximum number of kilobytes of queued pre-fetched messages in the local consumer queue. If using the high-level consumer this setting applies to the single consumer queue, regardless of the number of partitions. When using the legacy simple consumer or when separate partition queues are used this setting applies per partition. This value may be overshot by fetch.message.max.bytes. This property has higher priority than queued.min.messages.

default: 65536 importance: medium

Declaration
public int? QueuedMaxMessagesKbytes { get; set; }
Property Value
Type Description
int?

QueuedMinMessages

Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue.

default: 100000 importance: medium

Declaration
public int? QueuedMinMessages { get; set; }
Property Value
Type Description
int?

SessionTimeoutMs

Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the broker configuration properties group.min.session.timeout.ms and group.max.session.timeout.ms. Also see max.poll.interval.ms.

default: 45000 importance: high

Declaration
public int? SessionTimeoutMs { get; set; }
Property Value
Type Description
int?

Methods

ThrowIfContainsNonUserConfigurable()

Check if any properties have been set that have implications for application logic and therefore shouldn't be set via external configuration, independent of the code. Throw an ArgumentException if so.

Declaration
public ConsumerConfig ThrowIfContainsNonUserConfigurable()
Returns
Type Description
ConsumerConfig

Implements

IEnumerable<T>
IEnumerable
In this article