fix按时间查询消息bug,加长页面请求超时
This commit is contained in:
@@ -55,25 +55,26 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
||||
} else {
|
||||
val records = consumer.poll(Duration.ofMillis(timeoutMs))
|
||||
|
||||
for ((tp, endOff) <- endOffTable) {
|
||||
if (!terminate) {
|
||||
val recordList = records.records(tp)
|
||||
if (recordList.isEmpty) {
|
||||
arrive.remove(tp)
|
||||
} else {
|
||||
val first = recordList.get(0)
|
||||
if (first.offset() >= endOff) {
|
||||
arrive.remove(tp)
|
||||
} else {
|
||||
res.addAll(recordList)
|
||||
if (recordList.get(recordList.size() - 1).offset() >= endOff) {
|
||||
if (records.isEmpty) {
|
||||
terminate = true
|
||||
} else {
|
||||
for ((tp, endOff) <- endOffTable) {
|
||||
if (!terminate) {
|
||||
val recordList = records.records(tp)
|
||||
if (!recordList.isEmpty) {
|
||||
val first = recordList.get(0)
|
||||
if (first.offset() >= endOff) {
|
||||
arrive.remove(tp)
|
||||
} else {
|
||||
res.addAll(recordList)
|
||||
if (recordList.get(recordList.size() - 1).offset() >= endOff) {
|
||||
arrive.remove(tp)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (arrive.isEmpty) {
|
||||
terminate = true;
|
||||
if (arrive.isEmpty) {
|
||||
terminate = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user