class: KafkaConsumer
[41:14] (extern: com.lehman.aussom.KafkaConsumer) extends: object
The Kafka consumer, for direct synchronous polling: cons = new KafkaConsumer("broker1:9092", "billing"); cons.setAutoOffsetReset("earliest"); cons.connect(); cons.subscribe([ "orders.created" ]); recs = cons.poll(1000); For continuous consumption under Aussom Server use KafkaListener instead - the server then owns the thread and lifecycle. Threading: a Kafka consumer belongs to ONE thread. The wrapper adds no locking; the Kafka client rejects cross-thread use itself. The single exception is wakeup(), callable from any thread to break a blocked poll(). Topic-partition maps are { "topic": s, "partition": i }; offsets lists are lists of those maps with an added "offset" (and optional "metadata") - the same shapes everywhere in this module.
Methods
-
KafkaConsumer (
string Bootstrap, string GroupId = null)Creates a consumer for a bootstrap address. Does not connect yet.
- @p
Bootstrapis a string with host:port[,host:port...]. - @p
GroupIdis an optional string with the consumer group id (not needed for assign()-only use).
- @p
-
newConsumer (
string Bootstrap, string GroupId = null) -
setClientId (
string Id)Sets the client id used in broker logs, metrics, and quotas.
- @p
Idis a string with the client id. - @r
thisobject
- @p
-
setAutoOffsetReset (
string Mode)Sets where to start when the group has no committed offset.
- @p
Modeis a string: earliest, latest (default), or none. - @r
thisobject
- @p
-
setEnableAutoCommit (
bool On)Enables or disables client auto-commit during poll.
- @p
Onis a bool. - @r
thisobject
- @p
-
setMaxPollRecords (
int N)Sets the maximum records one poll() returns.
- @p
Nis an int with the record cap. - @r
thisobject
- @p
-
setMaxPollIntervalMs (
int Ms)Sets the processing deadline between polls before the group evicts this consumer.
- @p
Msis an int with the interval in milliseconds. - @r
thisobject
- @p
-
setSessionTimeoutMs (
int Ms)Sets the group session timeout.
- @p
Msis an int with the timeout in milliseconds. - @r
thisobject
- @p
-
setIsolationLevel (
string Level)Sets the transactional read isolation.
- @p
Levelis a string: read_uncommitted (default) or read_committed. - @r
thisobject
- @p
-
setKeyFormat (
string Fmt)Sets the key format.
- @p
Fmtis a string: string (default) or bytes (Buffer). - @r
thisobject
- @p
-
setValueFormat (
string Fmt)Sets the value format.
- @p
Fmtis a string: string (default) or bytes (Buffer). - @r
thisobject
- @p
-
setConfig (
string Key, Value)Sets any consumer config key directly (group.protocol, group.instance.id, security, and the rest). The ssl.*.location keys are rejected here - use the dedicated setters.
- @p
Keyis a string with the config key. - @p
Valueis the config value. - @r
thisobject
- @p
-
setConfigs (
map Cfg)Applies a whole config map at once.
- @p
Cfgis a map of config key to value. - @r
thisobject
- @p
-
setSslKeystoreFile (
string Path)Sets the TLS keystore file. Under Aussom Server the path is resolved through the app_data sandbox.
- @p
Pathis a string with the app-relative keystore path. - @r
thisobject
- @p
-
setSslTruststoreFile (
string Path)Sets the TLS truststore file. Under Aussom Server the path is resolved through the app_data sandbox.
- @p
Pathis a string with the app-relative truststore path. - @r
thisobject
- @p
-
connect ()
Builds the Kafka client from the stored config. Setters error after this point.
- @r
thisobject
- @r
-
subscribe (
list Topics, callback OnAssigned = null, callback OnRevoked = null)Subscribes to topics. The optional callbacks fire on this thread from inside poll() when the group rebalances, each with one list of topic-partition maps.
- @p
Topicsis a list of topic name strings. - @p
OnAssignedis an optional callback(list Partitions). - @p
OnRevokedis an optional callback(list Partitions). - @r
thisobject
- @p
-
subscribePattern (
string Regex, callback OnAssigned = null, callback OnRevoked = null)Subscribes to every topic matching a regex pattern.
- @p
Regexis a string with the topic pattern. - @p
OnAssignedis an optional callback(list Partitions). - @p
OnRevokedis an optional callback(list Partitions). - @r
thisobject
- @p
-
assign (
list TopicPartitions)Manually assigns partitions - no group coordination, no rebalancing.
- @p
TopicPartitionsis a list of topic-partition maps. - @r
thisobject
- @p
-
unsubscribe ()
Drops the current subscription or assignment.
- @r
thisobject
- @r
-
poll (
int TimeoutMs)Polls for records.
- @p
TimeoutMsis an int with the maximum wait. - @r
Alist of KafkaRecord objects (possibly empty).
- @p
-
commitSync (
list Offsets = null, int TimeoutMs = -1)Commits offsets synchronously. With no arguments commits the last poll.
- @p
Offsetsis an optional offsets list to commit exactly. - @p
TimeoutMsis an optional int bound on the commit wait. - @r
thisobject
- @p
-
commitAsync (
callback OnComplete = null)Commits the last poll asynchronously. The optional callback is invoked from within a later poll() on this thread.
- @p
OnCompleteis an optional callback(list Offsets, string ErrText) - ErrText is null on success. - @r
thisobject
- @p
-
seek (
string Topic, int Partition, int Offset)Seeks a partition to an offset for the next poll.
- @p
Topicis a string with the topic name. - @p
Partitionis an int with the partition. - @p
Offsetis an int with the offset. - @r
thisobject
- @p
-
seekToBeginning (
list TopicPartitions)Seeks partitions back to their first record.
- @p
TopicPartitionsis a list of topic-partition maps. - @r
thisobject
- @p
-
seekToEnd (
list TopicPartitions)Seeks partitions to their end.
- @p
TopicPartitionsis a list of topic-partition maps. - @r
thisobject
- @p
-
position (
map TopicPartition)Gets the next-fetch position for one partition.
- @p
TopicPartitionis a topic-partition map. - @r
Anint with the offset of the next record to fetch.
- @p
-
committed (
list TopicPartitions)Gets the committed offsets for partitions. Partitions with no committed offset are omitted from the result.
- @p
TopicPartitionsis a list of topic-partition maps. - @r
Anoffsets list.
- @p
-
currentLag (
map TopicPartition)Gets this consumer's lag on one partition.
- @p
TopicPartitionis a topic-partition map. - @r
Anint with the lag, or null when not yet known.
- @p
-
beginningOffsets (
list TopicPartitions)Gets the earliest available offsets.
- @p
TopicPartitionsis a list of topic-partition maps. - @r
Anoffsets list.
- @p
-
endOffsets (
list TopicPartitions)Gets the latest offsets (the offset the next produced record would get).
- @p
TopicPartitionsis a list of topic-partition maps. - @r
Anoffsets list.
- @p
-
offsetsForTimes (
list Query)Finds the earliest offset at or after a timestamp, per partition. Partitions with no such record are omitted.
- @p
Queryis a list of { "topic", "partition", "timestamp" } maps. - @r
Anoffsets list with a "timestamp" on each entry.
- @p
-
pause (
list TopicPartitions)Pauses fetching from partitions. poll() keeps heartbeating so the consumer stays in its group.
- @p
TopicPartitionsis a list of topic-partition maps. - @r
thisobject
- @p
-
resume (
list TopicPartitions)Resumes fetching from paused partitions.
- @p
TopicPartitionsis a list of topic-partition maps. - @r
thisobject
- @p
-
paused ()
Gets the currently paused partitions.
- @r
Alist of topic-partition maps.
- @r
-
assignment ()
Gets the currently assigned partitions.
- @r
Alist of topic-partition maps.
- @r
-
subscription ()
Gets the subscribed topics.
- @r
Alist of topic name strings.
- @r
-
listTopics ()
Lists all topics visible to this consumer.
- @r
Amap of topic name to partition info list.
- @r
-
partitionsFor (
string Topic)Gets partition info for a topic.
- @p
Topicis a string with the topic name. - @r
Alist of { "topic", "partition", "leader", "replicas", "isr" } maps.
- @p
-
enforceRebalance ()
Forces a group rebalance on the next poll.
- @r
thisobject
- @r
-
groupMetadata ()
Gets the consumer group metadata.
- @r
Amap { "groupId", "generationId", "memberId", "groupInstanceId" }.
- @r
-
metrics ()
Gets the client metrics.
- @r
Amap of metric group.name to numeric value.
- @r
-
wakeup ()
Breaks a blocked poll() from another thread - the one thread-safe consumer call.
- @r
thisobject
- @r
-
close (
int TimeoutMs = -1)Closes the consumer, leaving the group cleanly. Idempotent.
- @p
TimeoutMsis an optional int bound on the close wait. - @r
thisobject
- @p
-
isConnected ()
Reports whether connect() has been called.
- @r
Abool with true when connected.
- @r