Kafka Producer Blocking Point:

max.block.ms: max time to wait for the whole kafka.send method, although kafka.send is asynchronous, there are several synchronous/blocking operations within the method: max.block.ms includes:
Metadata retrieval time: the send method may needs to retrieve topic partition metadata in order to properly batch records to send. metadata.max.age.ms controls how long metadata will be cached inside client

Buffer allocation time: the append method of RecordAccumulator needs to wait for enough space in shared buffer pool and acquire memories. The acquired memories will be returned to pool in the following situations:

Batch is aborted on abnormal conditions:
Sender.maybeAbortBatches -> RecordAccumulator.abortBatches (if transaction manager contains an error)
Sender.run -> RecordAccumulator.abortIncompleteBatches (if sender is force closed)

After message sender receives produce response from broker
Sender.completeBatch or Sender.failBatch
Fail batch reasons: See (org.apache.kafka.clients.producer.internals.Sender)
1.Producer id changed
2.Exception caught on send response (retry time exhausted or exception is not retriable)
3.Batch expired. (batch exists in buffer for more than request.timeout.ms)

Kafka Producer Components:

RecordAccumulator: group producer records into batch by topic partition. A new batch will be created if existing last batch is full. (Exceeds batch.size)

Sender: drain batch records from recordAccumulator, group them by node Id (records targeting at the same host may be put in a single message). Make sure that the total size of batch records drained will never exceed max.request.size. Convert producer batches into clientRequest and send via NetworkClient. After complete sending the request, sender will then poll response. A short delay (linger.ms) is introduced if no data is immediately available before the next run

NetworkClient: Send clientRequest via tcp. The networkClient not only send producer records, but also other admin messages like metadata update message etc.





Message order guarantee:

NetworkClient contains InFlightRequests. InFlightRequests contains request sent but not yet received response. Configuration: max.in.flight.requests.per.connection controls max number of inFlightRequest. Larger values will improve throughput, at the cost of potential out of order on redelivery (where later request is acknowledged but earlier request failed and needs redelivery). Once this threshold is reached, networkClient will not be able to send more requests by throwing IllegalStateException (NetworkClient.doSend)
Message order is guaranteed if max.in.flight.requests.per.connection is 1.
Mute partition: If a certain topic partition has some record batch in flight, that partition is muted, which means that the next drain operation of recordAccumulator will exclude that partition. Partition will be unmuted once the batch is completed
Starting from version 1.1, KafkaProducer has a new configuration parameter enable.idempotence to guarantee message order, so max.in.flight.requests.per.connection can be greater than 1 but cannot exceed 5.

Metrics and Troubleshooting:

Sender.handleProduceResponse invokes sensors.recordLatency, which records latency between request/response. This can be useful for troubleshooting sender related issues.