From fa2192d42a1ad0afce0bae3672c9aa850461d765 Mon Sep 17 00:00:00 2001 From: Chris Jackson Date: Mon, 20 Jan 2025 22:10:19 +1300 Subject: [PATCH] Update logger to provide history and improve performance Signed-off-by: Chris Jackson --- .../openhab/core/io/websocket/log/LogDTO.java | 30 ++- .../core/io/websocket/log/LogFilterDTO.java | 28 +++ .../core/io/websocket/log/LogWebSocket.java | 196 +++++++++++++++--- .../io/websocket/log/LogWebSocketAdapter.java | 18 +- 4 files changed, 234 insertions(+), 38 deletions(-) create mode 100644 bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogFilterDTO.java diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogDTO.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogDTO.java index e35c577d517..e555b77de2d 100644 --- a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogDTO.java +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogDTO.java @@ -15,20 +15,34 @@ import java.util.Date; import org.eclipse.jdt.annotation.NonNullByDefault; -import org.eclipse.jdt.annotation.Nullable; -import org.openhab.core.io.websocket.event.EventDTO; import org.osgi.service.log.LogLevel; /** - * The {@link EventDTO} is used for serialization and deserialization of events + * The {@link LogDTO} is used for serialization and deserialization of log messages * * @author Jan N. Klug - Initial contribution + * @author Chris Jackson - Add sequence and make Comparable based on sequence */ @NonNullByDefault -public class LogDTO { - public @Nullable String loggerName; - public @Nullable LogLevel level; - public @Nullable Date timestamp; +public class LogDTO implements Comparable { + public String loggerName; + public LogLevel level; + public Date timestamp; public long unixtime; - public @Nullable String message; + public String message; + public long sequence; + + public LogDTO(long sequence, String loggerName, LogLevel level, long unixtime, String message) { + this.sequence = sequence; + this.loggerName = loggerName; + this.level = level; + this.timestamp = new Date(unixtime); + this.unixtime = unixtime; + this.message = message; + } + + @Override + public int compareTo(LogDTO o) { + return (int) (sequence - o.sequence); + } } diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogFilterDTO.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogFilterDTO.java new file mode 100644 index 00000000000..0b94fe74006 --- /dev/null +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogFilterDTO.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2010-2025 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.io.websocket.log; + +import java.util.List; + +/** + * The {@link LogFilterDTO} is used for serialization and deserialization of log filters + * + * @author Chris Jackson - Initial contribution + */ +public class LogFilterDTO { + + public Long timeStart; + public Long timeStop; + public List loggerNames; + public Long sequenceStart; +} diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogWebSocket.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogWebSocket.java index 939d16cb9c7..4210c56d2d9 100644 --- a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogWebSocket.java +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogWebSocket.java @@ -13,11 +13,18 @@ package org.openhab.core.io.websocket.log; import java.io.IOException; -import java.util.Date; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -29,6 +36,7 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.openhab.core.common.ThreadPoolManager; import org.osgi.service.log.LogEntry; import org.osgi.service.log.LogListener; import org.slf4j.Logger; @@ -39,9 +47,17 @@ import com.google.gson.reflect.TypeToken; /** - * The {@link LogWebSocket} is the WebSocket implementation for logs + * The {@link LogWebSocket} is the WebSocket implementation for logs. + * + * This supports sending of history, and provides a method of managing message cadence. + * When a client connects, it must send a filter request before the server will send any logs. This triggers the sending + * of history. + * + * Live logs are sent as individual messages if they are received with sufficient spacing. When logs come in very + * quickly, they are clustered together and sent as an array after up to 100mS. * * @author Jan N. Klug - Initial contribution + * @author Chris Jackson - Add history and improve performance using arrays */ @WebSocket @NonNullByDefault @@ -51,6 +67,9 @@ public class LogWebSocket implements LogListener { private static final TypeToken> STRING_LIST_TYPE = (TypeToken>) TypeToken .getParameterized(List.class, String.class); + private final static int SEND_PERIOD = 100; // Minimum allowable time between log packets (in milliseconds) + private final static long FIRST_SEQUENCE = 0; + private final Logger logger = LoggerFactory.getLogger(LogWebSocket.class); private final LogWebSocketAdapter wsAdapter; @@ -58,19 +77,31 @@ public class LogWebSocket implements LogListener { private @Nullable Session session; private @Nullable RemoteEndpoint remoteEndpoint; - private String remoteIdentifier = ""; + + private final ScheduledExecutorService scheduledExecutorService; + private @Nullable ScheduledFuture commitScheduledFuture; + + private long lastSentTime = 0; + private List deferredLogs = new ArrayList<>(); + + private boolean enabled = false; + private long lastSequence = FIRST_SEQUENCE; private List loggerPatterns = List.of(); public LogWebSocket(Gson gson, LogWebSocketAdapter wsAdapter) { this.wsAdapter = wsAdapter; this.gson = gson; + + scheduledExecutorService = ThreadPoolManager.getScheduledPool("LogWebSocket"); } @OnWebSocketClose public void onClose(int statusCode, String reason) { - this.wsAdapter.unregisterListener(this); - remoteIdentifier = ""; + if (enabled) { + this.wsAdapter.unregisterListener(this); + } + stopDeferredScheduledFuture(); this.session = null; this.remoteEndpoint = null; } @@ -80,23 +111,104 @@ public void onConnect(Session session) { this.session = session; RemoteEndpoint remoteEndpoint = session.getRemote(); this.remoteEndpoint = remoteEndpoint; - this.remoteIdentifier = remoteEndpoint.getInetSocketAddress().toString(); - this.wsAdapter.registerListener(this); } @OnWebSocketMessage public void onText(String message) { + // Detect empty message (keepalive) and ignore + if (message.equals("{}")) { + return; + } + + // Defer sending live logs while we process the history + lastSentTime = Long.MAX_VALUE; + stopDeferredScheduledFuture(); + + // Enable log messages + if (!enabled) { + this.wsAdapter.registerListener(this); + enabled = true; + } + RemoteEndpoint remoteEndpoint = this.remoteEndpoint; if (session == null || remoteEndpoint == null) { // no connection or no remote endpoint , do nothing this is possible due to async behavior return; } + LogFilterDTO logFilterDto; try { - loggerPatterns = gson.fromJson(message, STRING_LIST_TYPE).stream().map(Pattern::compile).toList(); + logFilterDto = gson.fromJson(message, LogFilterDTO.class); } catch (JsonParseException e) { - logger.warn("Failed to parse '{}' to a list of subscribed loggers", message); + logger.warn("Failed to parse '{}' to a valid log filter object", message); + return; + } + + loggerPatterns = logFilterDto.loggerNames == null ? List.of() + : logFilterDto.loggerNames.stream().map(Pattern::compile).toList(); + + Long timeStart; + Long timeStop; + if (logFilterDto.timeStart != null) { + timeStart = logFilterDto.timeStart; + } else { + timeStart = Long.MIN_VALUE; + } + if (logFilterDto.timeStop != null) { + timeStop = logFilterDto.timeStop; + } else { + timeStop = Long.MAX_VALUE; + } + + Long sequenceStart; + if (logFilterDto.sequenceStart != null) { + sequenceStart = logFilterDto.sequenceStart; + } else { + sequenceStart = lastSequence; + } + + List logs = new ArrayList<>(); + for (Enumeration history = wsAdapter.getLog(); history.hasMoreElements();) { + logs.add(history.nextElement()); + } + + if (logs.isEmpty()) { + lastSentTime = 0; + return; + } + + Predicate withinTimeRange = log -> (log.getTime() >= timeStart) && (log.getTime() <= timeStop); + Predicate withinSequence = log -> log.getSequence() > sequenceStart; + Predicate nameMatchesAnyPattern = log -> loggerPatterns.stream() + .anyMatch(pattern -> pattern.matcher(log.getLoggerName()).matches()); + + List filteredEvents = logs.stream().filter(withinTimeRange.and(withinSequence)) + .collect(Collectors.toList()); + // List filteredEvents = logs.stream().filter(withinTimeRange.and(nameMatchesAnyPattern)) + // .collect(Collectors.toList()); + List dtoList = filteredEvents.stream().map(this::map).collect(Collectors.toList()); + Collections.sort(dtoList); + + try { + sendMessage(gson.toJson(dtoList)); + } catch (IOException e) { + } + lastSentTime = System.currentTimeMillis(); + + // Remove any duplicates from the live log buffer + long newestSequence = logs.get(0).getSequence(); + synchronized (deferredLogs) { + Iterator iterator = deferredLogs.iterator(); + while (iterator.hasNext()) { + LogDTO value = iterator.next(); + if (value.sequence <= newestSequence) { + iterator.remove(); + } + } } + + // Continue with live logs... + flush(); } @OnWebSocketError @@ -120,28 +232,64 @@ private synchronized void sendMessage(String message) throws IOException { @Override public void logged(@NonNullByDefault({}) LogEntry logEntry) { - if (!loggerPatterns.isEmpty() && loggerPatterns.stream().noneMatch(logMatch(logEntry))) { + if (!loggerPatterns.isEmpty() && loggerPatterns.stream().noneMatch(logPatternMatch(logEntry))) { return; } - try { - LogDTO logDTO = map(logEntry); - sendMessage(gson.toJson(logDTO)); - } catch (IOException e) { - // Fail silently! + + LogDTO logDTO = map(logEntry); + lastSequence = logEntry.getSequence(); + + // If the last message sent was less than SEND_PERIOD ago, then we just buffer + if (lastSentTime > System.currentTimeMillis() - SEND_PERIOD) { + // Start the timer if this is the first deferred log + synchronized (deferredLogs) { + if (deferredLogs.isEmpty()) { + commitScheduledFuture = scheduledExecutorService.schedule(this::flush, + lastSentTime + SEND_PERIOD - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + deferredLogs.add(logDTO); + } + } else { + lastSentTime = System.currentTimeMillis(); + try { + sendMessage(gson.toJson(logDTO)); + } catch (IOException e) { + // Fail silently! + } } } - private static Predicate logMatch(LogEntry logEntry) { + private static Predicate logPatternMatch(LogEntry logEntry) { return pattern -> pattern.matcher(logEntry.getLoggerName()).matches(); } - private static LogDTO map(LogEntry logEntry) { - LogDTO logDTO = new LogDTO(); - logDTO.loggerName = logEntry.getLoggerName(); - logDTO.level = logEntry.getLogLevel(); - logDTO.unixtime = logEntry.getTime(); - logDTO.timestamp = new Date(logEntry.getTime()); - logDTO.message = logEntry.getMessage(); - return logDTO; + private LogDTO map(LogEntry logEntry) { + return new LogDTO(logEntry.getSequence(), logEntry.getLoggerName(), logEntry.getLogLevel(), logEntry.getTime(), + logEntry.getMessage()); + } + + private void stopDeferredScheduledFuture() { + // Stop any existing scheduled commit + ScheduledFuture commitScheduledFuture = this.commitScheduledFuture; + if (commitScheduledFuture != null) { + commitScheduledFuture.cancel(false); + commitScheduledFuture = null; + } + } + + private synchronized void flush() { + stopDeferredScheduledFuture(); + + synchronized (deferredLogs) { + if (!deferredLogs.isEmpty()) { + try { + sendMessage(gson.toJson(deferredLogs)); + } catch (IOException e) { + } + + deferredLogs.clear(); + } + } } } diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogWebSocketAdapter.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogWebSocketAdapter.java index 3a245e4e5a2..93a763dc861 100644 --- a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogWebSocketAdapter.java +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogWebSocketAdapter.java @@ -12,6 +12,7 @@ */ package org.openhab.core.io.websocket.log; +import java.util.Enumeration; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -23,6 +24,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; +import org.osgi.service.log.LogEntry; import org.osgi.service.log.LogReaderService; import com.google.gson.Gson; @@ -50,14 +52,14 @@ public void deactivate() { webSockets.forEach(logReaderService::removeLogListener); } - public void registerListener(LogWebSocket eventWebSocket) { - webSockets.add(eventWebSocket); - logReaderService.addLogListener(eventWebSocket); + public void registerListener(LogWebSocket logWebSocket) { + webSockets.add(logWebSocket); + logReaderService.addLogListener(logWebSocket); } - public void unregisterListener(LogWebSocket eventWebSocket) { - logReaderService.removeLogListener(eventWebSocket); - webSockets.remove(eventWebSocket); + public void unregisterListener(LogWebSocket logWebSocket) { + logReaderService.removeLogListener(logWebSocket); + webSockets.remove(logWebSocket); } @Override @@ -70,4 +72,8 @@ public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) { return new LogWebSocket(gson, LogWebSocketAdapter.this); } + + public Enumeration getLog() { + return logReaderService.getLog(); + } }