按时间查询消息及时释放内存
This commit is contained in:
@@ -8,7 +8,7 @@ import java.time.Duration
|
||||
import java.util
|
||||
import java.util.Properties
|
||||
import scala.collection.immutable
|
||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala}
|
||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava}
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
@@ -60,20 +60,48 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
||||
} else {
|
||||
for ((tp, endOff) <- endOffTable) {
|
||||
if (!terminate) {
|
||||
val recordList = records.records(tp)
|
||||
var recordList = records.records(tp)
|
||||
if (!recordList.isEmpty) {
|
||||
val first = recordList.get(0)
|
||||
if (first.offset() >= endOff) {
|
||||
arrive.remove(tp)
|
||||
} else {
|
||||
res.addAll(recordList)
|
||||
//
|
||||
// (String topic,
|
||||
// int partition,
|
||||
// long offset,
|
||||
// long timestamp,
|
||||
// TimestampType timestampType,
|
||||
// Long checksum,
|
||||
// int serializedKeySize,
|
||||
// int serializedValueSize,
|
||||
// K key,
|
||||
// V value,
|
||||
// Headers headers,
|
||||
// Optional<Integer> leaderEpoch)
|
||||
val nullVList = recordList.asScala.map(record => new ConsumerRecord[Array[Byte], Array[Byte]](record.topic(),
|
||||
record.partition(),
|
||||
record.offset(),
|
||||
record.timestamp(),
|
||||
record.timestampType(),
|
||||
record.checksum(),
|
||||
record.serializedKeySize(),
|
||||
record.serializedValueSize(),
|
||||
record.key(),
|
||||
null,
|
||||
record.headers(),
|
||||
record.leaderEpoch())).toSeq.asJava
|
||||
res.addAll(nullVList)
|
||||
if (recordList.get(recordList.size() - 1).offset() >= endOff) {
|
||||
arrive.remove(tp)
|
||||
}
|
||||
if (recordList != null) {
|
||||
recordList = null
|
||||
}
|
||||
}
|
||||
}
|
||||
if (arrive.isEmpty) {
|
||||
terminate = true;
|
||||
terminate = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user