class: KafkaShareConsumer
[39:14] (extern: com.lehman.aussom.KafkaShareConsumer) extends: object
The Kafka share consumer: queue semantics on a Kafka topic (KIP-932, broker 4.2+). Many consumers in a share group can read the same partitions; each record is individually accepted, released (redelivered), or rejected (poison). The broker's delivery-count limit archives records that fail too many times. cons = new KafkaShareConsumer("broker1:9092", "billing"); cons.setAckMode("explicit"); cons.connect(); cons.subscribe([ "orders.created" ]); recs = cons.poll(1000); // ... handle, then per record: cons.acknowledge(recs[0], "accept"); cons.commitSync(); Threading: same one-thread rule as KafkaConsumer; only wakeup() is safe from another thread.
Methods
-
KafkaShareConsumer (
string Bootstrap, string GroupId)Creates a share consumer. Does not connect yet.
- @p
Bootstrapis a string with host:port[,host:port...]. - @p
GroupIdis a string with the share group id (required).
- @p
-
newShareConsumer (
string Bootstrap, string GroupId) -
setAckMode (
string Mode)Sets the acknowledgement mode.
- @p
Modeis a string: implicit (default; the next poll/commit accepts the previous batch) or explicit (every record must be acknowledged before the next poll). - @r
thisobject
- @p
-
setClientId (
string Id)Sets the client id used in broker logs, metrics, and quotas.
- @p
Idis a string with the client id. - @r
thisobject
- @p
-
setKeyFormat (
string Fmt)Sets the key format.
- @p
Fmtis a string: string (default) or bytes (Buffer). - @r
thisobject
- @p
-
setValueFormat (
string Fmt)Sets the value format.
- @p
Fmtis a string: string (default) or bytes (Buffer). - @r
thisobject
- @p
-
setConfig (
string Key, Value)Sets any share consumer config key directly. The ssl.*.location keys are rejected here - use the dedicated setters.
- @p
Keyis a string with the config key. - @p
Valueis the config value. - @r
thisobject
- @p
-
setConfigs (
map Cfg)Applies a whole config map at once.
- @p
Cfgis a map of config key to value. - @r
thisobject
- @p
-
setSslKeystoreFile (
string Path)Sets the TLS keystore file. Under Aussom Server the path is resolved through the app_data sandbox.
- @p
Pathis a string with the app-relative keystore path. - @r
thisobject
- @p
-
setSslTruststoreFile (
string Path)Sets the TLS truststore file. Under Aussom Server the path is resolved through the app_data sandbox.
- @p
Pathis a string with the app-relative truststore path. - @r
thisobject
- @p
-
connect ()
Builds the Kafka client from the stored config.
- @r
thisobject
- @r
-
subscribe (
list Topics)Subscribes to topics. Share groups have no pattern subscribe or manual assignment.
- @p
Topicsis a list of topic name strings. - @r
thisobject
- @p
-
unsubscribe ()
Drops the subscription.
- @r
thisobject
- @r
-
subscription ()
Gets the subscribed topics.
- @r
Alist of topic name strings.
- @r
-
poll (
int TimeoutMs)Polls for acquired records.
- @p
TimeoutMsis an int with the maximum wait. - @r
Alist of KafkaRecord objects (possibly empty).
- @p
-
acknowledge (
object Record, string Type = "accept")Acknowledges one record from the last poll (explicit mode).
- @p
Recordis the KafkaRecord to acknowledge. - @p
Typeis an optional string: accept (default), release (redeliver to someone), reject (poison, never redeliver), or renew (extend the acquisition lock). - @r
thisobject
- @p
-
commitSync (
int TimeoutMs = -1)Commits the acknowledgements to the broker synchronously.
- @p
TimeoutMsis an optional int bound on the commit wait. - @r
Alist of { "topic", "partition", "error" } maps - empty when every acknowledgement was applied.
- @p
-
commitAsync ()
Commits the acknowledgements asynchronously.
- @r
thisobject
- @r
-
setOnAcknowledgementCommit (
callback Cb)Sets a callback invoked on this thread (from poll/commit/ close) as acknowledgement commits complete.
- @p
Cbis a callback taking (list Entries, string ErrText); each entry is { "topic", "partition", "offsets" } and ErrText is null on success. - @r
thisobject
- @p
-
acquisitionLockTimeoutMs ()
Gets the broker's record acquisition lock timeout once the client has learned it.
- @r
Anint with the timeout in milliseconds, or null.
- @r
-
metrics ()
Gets the client metrics.
- @r
Amap of metric group.name to numeric value.
- @r
-
wakeup ()
Breaks a blocked poll() from another thread - the one thread-safe call.
- @r
thisobject
- @r
-
close (
int TimeoutMs = -1)Closes the share consumer, releasing unacknowledged records. Idempotent.
- @p
TimeoutMsis an optional int bound on the close wait. - @r
thisobject
- @p
-
isConnected ()
Reports whether connect() has been called.
- @r
Abool with true when connected.
- @r