class: KafkaProducer
[34:14] (extern: com.lehman.aussom.KafkaProducer) extends: object
The Kafka producer. Configure with the typed setters and the setConfig/setConfigs passthrough, then connect() builds the client: prod = new KafkaProducer("broker1:9092"); prod.connect(); md = prod.send("orders.created", "o-1001", "{"qty": 3}"); send() is synchronous: it blocks on the broker acknowledgement (bounded by delivery.timeout.ms) and returns the record metadata. sendAsync() is fire-and-forget; failures are surfaced by flush() and getLastAsyncError(). The producer is thread safe once connected.
Methods
-
KafkaProducer (
string Bootstrap)Creates a producer for a bootstrap address. Does not connect yet.
- @p
Bootstrapis a string with host:port[,host:port...].
- @p
-
newProducer (
string Bootstrap) -
setClientId (
string Id)Sets the client id used in broker logs, metrics, and quotas.
- @p
Idis a string with the client id. - @r
thisobject
- @p
-
setAcks (
string Acks)Sets the acknowledgement level.
- @p
Acksis a string: all (default), 1, or 0. - @r
thisobject
- @p
-
setCompression (
string Type)Sets the compression codec for record batches.
- @p
Typeis a string: none, gzip, snappy, lz4, or zstd. - @r
thisobject
- @p
-
setLingerMs (
int Ms)Sets how long the producer waits to fill a batch before sending.
- @p
Msis an int with the linger time in milliseconds. - @r
thisobject
- @p
-
setBatchSize (
int Bytes)Sets the maximum batch size.
- @p
Bytesis an int with the batch size in bytes. - @r
thisobject
- @p
-
setEnableIdempotence (
bool On)Enables or disables the idempotent producer (client default is enabled).
- @p
Onis a bool. - @r
thisobject
- @p
-
setTransactionalId (
string Id)Sets the transactional id, enabling the transactional producer (initTransactions / beginTransaction / commitTransaction).
- @p
Idis a string with the transactional id. - @r
thisobject
- @p
-
setDeliveryTimeoutMs (
int Ms)Sets the upper bound on how long a synchronous send() blocks.
- @p
Msis an int with the delivery timeout in milliseconds. - @r
thisobject
- @p
-
setRequestTimeoutMs (
int Ms)Sets the per-request broker timeout.
- @p
Msis an int with the request timeout in milliseconds. - @r
thisobject
- @p
-
setKeyFormat (
string Fmt)Sets the key format.
- @p
Fmtis a string: string (default) or bytes (Buffer). - @r
thisobject
- @p
-
setValueFormat (
string Fmt)Sets the value format.
- @p
Fmtis a string: string (default) or bytes (Buffer). - @r
thisobject
- @p
-
setConfig (
string Key, Value)Sets any producer config key directly (security included). The ssl.keystore.location / ssl.truststore.location keys are rejected here - use setSslKeystoreFile / setSslTruststoreFile.
- @p
Keyis a string with the config key. - @p
Valueis the config value. - @r
thisobject
- @p
-
setConfigs (
map Cfg)Applies a whole config map at once.
- @p
Cfgis a map of config key to value. - @r
thisobject
- @p
-
setSslKeystoreFile (
string Path)Sets the TLS keystore file. Under Aussom Server the path is resolved through the app_data sandbox.
- @p
Pathis a string with the app-relative keystore path. - @r
thisobject
- @p
-
setSslTruststoreFile (
string Path)Sets the TLS truststore file. Under Aussom Server the path is resolved through the app_data sandbox.
- @p
Pathis a string with the app-relative truststore path. - @r
thisobject
- @p
-
connect ()
Builds the Kafka client from the stored config. Setters error after this point.
- @r
thisobject
- @r
-
send (
string Topic, Key, Value, map Headers = null, int Partition = -1, int TimestampMs = -1)Sends one record and blocks on the broker acknowledgement.
- @p
Topicis a string with the topic name. - @p
Keyis the record key (string/Buffer per format), or null. - @p
Valueis the record value, or null (a tombstone). - @p
Headersis an optional map of header name to value. - @p
Partitionis an optional int to pin a partition (-1 lets the partitioner choose). - @p
TimestampMsis an optional int record timestamp (-1 lets the broker/client stamp it). - @r
Amap { "topic", "partition", "offset", "timestamp" }.
- @p
-
sendAsync (
string Topic, Key, Value, map Headers = null, int Partition = -1, int TimestampMs = -1)Sends one record fire-and-forget. Failures are counted and surfaced by flush() / getLastAsyncError().
- @p
Topicis a string with the topic name. - @p
Keyis the record key, or null. - @p
Valueis the record value, or null. - @p
Headersis an optional map of header name to value. - @p
Partitionis an optional int to pin a partition. - @p
TimestampMsis an optional int record timestamp. - @r
thisobject
- @p
-
flush ()
Flushes all buffered sends.
- @r
Anint with the sendAsync failure count since the last flush (0 means everything was delivered).
- @r
-
getLastAsyncError ()
Gets the text of the most recent sendAsync failure.
- @r
Astring with the error text, or null.
- @r
-
partitionsFor (
string Topic)Gets partition info for a topic.
- @p
Topicis a string with the topic name. - @r
Alist of { "topic", "partition", "leader", "replicas", "isr" } maps.
- @p
-
metrics ()
Gets the client metrics.
- @r
Amap of metric group.name to numeric value.
- @r
-
initTransactions ()
Initializes the transactional producer. Requires setTransactionalId before connect().
- @r
thisobject
- @r
-
beginTransaction ()
Begins a transaction.
- @r
thisobject
- @r
-
commitTransaction ()
Commits the open transaction.
- @r
thisobject
- @r
-
abortTransaction ()
Aborts the open transaction.
- @r
thisobject
- @r
-
sendOffsetsToTransaction (
list Offsets, object Consumer)Adds consumed offsets to the open transaction for exactly-once consume-transform-produce.
- @p
Offsetsis a list of { "topic", "partition", "offset" } maps - each offset is the NEXT offset to read (last consumed + 1). - @p
Consumeris the KafkaConsumer whose group consumed them. - @r
thisobject
- @p
-
close (
int TimeoutMs = -1)Closes the producer. Idempotent.
- @p
TimeoutMsis an optional int bound on the close wait. - @r
thisobject
- @p
-
isConnected ()
Reports whether connect() has been called.
- @r
Abool with true when connected.
- @r