Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update logger to provide history and improve performance #4570

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,34 @@
import java.util.Date;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.io.websocket.event.EventDTO;
import org.osgi.service.log.LogLevel;

/**
* The {@link EventDTO} is used for serialization and deserialization of events
* The {@link LogDTO} is used for serialization and deserialization of log messages
*
* @author Jan N. Klug - Initial contribution
* @author Chris Jackson - Add sequence and make Comparable based on sequence
*/
@NonNullByDefault
public class LogDTO {
public @Nullable String loggerName;
public @Nullable LogLevel level;
public @Nullable Date timestamp;
public class LogDTO implements Comparable<LogDTO> {
public String loggerName;
public LogLevel level;
public Date timestamp;
public long unixtime;
public @Nullable String message;
public String message;
public long sequence;

public LogDTO(long sequence, String loggerName, LogLevel level, long unixtime, String message) {
this.sequence = sequence;
this.loggerName = loggerName;
this.level = level;
this.timestamp = new Date(unixtime);
this.unixtime = unixtime;
this.message = message;
}

@Override
public int compareTo(LogDTO o) {
return (int) (sequence - o.sequence);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2010-2025 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.core.io.websocket.log;

import java.util.List;

/**
* The {@link LogFilterDTO} is used for serialization and deserialization of log filters
*
* @author Chris Jackson - Initial contribution
*/
public class LogFilterDTO {

public Long timeStart;
public Long timeStop;
public List<String> loggerNames;
public Long sequenceStart;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@
package org.openhab.core.io.websocket.log;

import java.io.IOException;
import java.util.Date;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
Expand All @@ -29,6 +36,7 @@
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.openhab.core.common.ThreadPoolManager;
import org.osgi.service.log.LogEntry;
import org.osgi.service.log.LogListener;
import org.slf4j.Logger;
Expand All @@ -39,9 +47,17 @@
import com.google.gson.reflect.TypeToken;

/**
* The {@link LogWebSocket} is the WebSocket implementation for logs
* The {@link LogWebSocket} is the WebSocket implementation for logs.
*
* This supports sending of history, and provides a method of managing message cadence.
* When a client connects, it must send a filter request before the server will send any logs. This triggers the sending
* of history.
*
* Live logs are sent as individual messages if they are received with sufficient spacing. When logs come in very
* quickly, they are clustered together and sent as an array after up to 100mS.
*
* @author Jan N. Klug - Initial contribution
* @author Chris Jackson - Add history and improve performance using arrays
*/
@WebSocket
@NonNullByDefault
Expand All @@ -51,26 +67,41 @@ public class LogWebSocket implements LogListener {
private static final TypeToken<List<String>> STRING_LIST_TYPE = (TypeToken<List<String>>) TypeToken
.getParameterized(List.class, String.class);

private final static int SEND_PERIOD = 100; // Minimum allowable time between log packets (in milliseconds)
private final static long FIRST_SEQUENCE = 0;

private final Logger logger = LoggerFactory.getLogger(LogWebSocket.class);

private final LogWebSocketAdapter wsAdapter;
private final Gson gson;

private @Nullable Session session;
private @Nullable RemoteEndpoint remoteEndpoint;
private String remoteIdentifier = "<unknown>";

private final ScheduledExecutorService scheduledExecutorService;
private @Nullable ScheduledFuture<?> commitScheduledFuture;

private long lastSentTime = 0;
private List<LogDTO> deferredLogs = new ArrayList<>();

private boolean enabled = false;
private long lastSequence = FIRST_SEQUENCE;

private List<Pattern> loggerPatterns = List.of();

public LogWebSocket(Gson gson, LogWebSocketAdapter wsAdapter) {
this.wsAdapter = wsAdapter;
this.gson = gson;

scheduledExecutorService = ThreadPoolManager.getScheduledPool("LogWebSocket");
}

@OnWebSocketClose
public void onClose(int statusCode, String reason) {
this.wsAdapter.unregisterListener(this);
remoteIdentifier = "<unknown>";
if (enabled) {
this.wsAdapter.unregisterListener(this);
}
stopDeferredScheduledFuture();
this.session = null;
this.remoteEndpoint = null;
}
Expand All @@ -80,23 +111,104 @@ public void onConnect(Session session) {
this.session = session;
RemoteEndpoint remoteEndpoint = session.getRemote();
this.remoteEndpoint = remoteEndpoint;
this.remoteIdentifier = remoteEndpoint.getInetSocketAddress().toString();
this.wsAdapter.registerListener(this);
}

@OnWebSocketMessage
public void onText(String message) {
// Detect empty message (keepalive) and ignore
if (message.equals("{}")) {
return;
}

// Defer sending live logs while we process the history
lastSentTime = Long.MAX_VALUE;
stopDeferredScheduledFuture();

// Enable log messages
if (!enabled) {
this.wsAdapter.registerListener(this);
enabled = true;
}

RemoteEndpoint remoteEndpoint = this.remoteEndpoint;
if (session == null || remoteEndpoint == null) {
// no connection or no remote endpoint , do nothing this is possible due to async behavior
return;
}

LogFilterDTO logFilterDto;
try {
loggerPatterns = gson.fromJson(message, STRING_LIST_TYPE).stream().map(Pattern::compile).toList();
logFilterDto = gson.fromJson(message, LogFilterDTO.class);
} catch (JsonParseException e) {
logger.warn("Failed to parse '{}' to a list of subscribed loggers", message);
logger.warn("Failed to parse '{}' to a valid log filter object", message);
return;
}

loggerPatterns = logFilterDto.loggerNames == null ? List.of()
: logFilterDto.loggerNames.stream().map(Pattern::compile).toList();

Long timeStart;
Long timeStop;
if (logFilterDto.timeStart != null) {
timeStart = logFilterDto.timeStart;
} else {
timeStart = Long.MIN_VALUE;
}
if (logFilterDto.timeStop != null) {
timeStop = logFilterDto.timeStop;
} else {
timeStop = Long.MAX_VALUE;
}

Long sequenceStart;
if (logFilterDto.sequenceStart != null) {
sequenceStart = logFilterDto.sequenceStart;
} else {
sequenceStart = lastSequence;
}

List<LogEntry> logs = new ArrayList<>();
for (Enumeration<LogEntry> history = wsAdapter.getLog(); history.hasMoreElements();) {
logs.add(history.nextElement());
}

if (logs.isEmpty()) {
lastSentTime = 0;
return;
}

Predicate<LogEntry> withinTimeRange = log -> (log.getTime() >= timeStart) && (log.getTime() <= timeStop);
Predicate<LogEntry> withinSequence = log -> log.getSequence() > sequenceStart;
Predicate<LogEntry> nameMatchesAnyPattern = log -> loggerPatterns.stream()
.anyMatch(pattern -> pattern.matcher(log.getLoggerName()).matches());

List<LogEntry> filteredEvents = logs.stream().filter(withinTimeRange.and(withinSequence))
.collect(Collectors.toList());
// List<LogEntry> filteredEvents = logs.stream().filter(withinTimeRange.and(nameMatchesAnyPattern))
// .collect(Collectors.toList());
List<LogDTO> dtoList = filteredEvents.stream().map(this::map).collect(Collectors.toList());
Collections.sort(dtoList);

try {
sendMessage(gson.toJson(dtoList));
} catch (IOException e) {
}
lastSentTime = System.currentTimeMillis();

// Remove any duplicates from the live log buffer
long newestSequence = logs.get(0).getSequence();
synchronized (deferredLogs) {
Iterator<LogDTO> iterator = deferredLogs.iterator();
while (iterator.hasNext()) {
LogDTO value = iterator.next();
if (value.sequence <= newestSequence) {
iterator.remove();
}
}
}

// Continue with live logs...
flush();
}

@OnWebSocketError
Expand All @@ -120,28 +232,64 @@ private synchronized void sendMessage(String message) throws IOException {

@Override
public void logged(@NonNullByDefault({}) LogEntry logEntry) {
if (!loggerPatterns.isEmpty() && loggerPatterns.stream().noneMatch(logMatch(logEntry))) {
if (!loggerPatterns.isEmpty() && loggerPatterns.stream().noneMatch(logPatternMatch(logEntry))) {
return;
}
try {
LogDTO logDTO = map(logEntry);
sendMessage(gson.toJson(logDTO));
} catch (IOException e) {
// Fail silently!

LogDTO logDTO = map(logEntry);
lastSequence = logEntry.getSequence();

// If the last message sent was less than SEND_PERIOD ago, then we just buffer
if (lastSentTime > System.currentTimeMillis() - SEND_PERIOD) {
// Start the timer if this is the first deferred log
synchronized (deferredLogs) {
if (deferredLogs.isEmpty()) {
commitScheduledFuture = scheduledExecutorService.schedule(this::flush,
lastSentTime + SEND_PERIOD - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

deferredLogs.add(logDTO);
}
} else {
lastSentTime = System.currentTimeMillis();
try {
sendMessage(gson.toJson(logDTO));
} catch (IOException e) {
// Fail silently!
}
}
}

private static Predicate<Pattern> logMatch(LogEntry logEntry) {
private static Predicate<Pattern> logPatternMatch(LogEntry logEntry) {
return pattern -> pattern.matcher(logEntry.getLoggerName()).matches();
}

private static LogDTO map(LogEntry logEntry) {
LogDTO logDTO = new LogDTO();
logDTO.loggerName = logEntry.getLoggerName();
logDTO.level = logEntry.getLogLevel();
logDTO.unixtime = logEntry.getTime();
logDTO.timestamp = new Date(logEntry.getTime());
logDTO.message = logEntry.getMessage();
return logDTO;
private LogDTO map(LogEntry logEntry) {
return new LogDTO(logEntry.getSequence(), logEntry.getLoggerName(), logEntry.getLogLevel(), logEntry.getTime(),
logEntry.getMessage());
}

private void stopDeferredScheduledFuture() {
// Stop any existing scheduled commit
ScheduledFuture<?> commitScheduledFuture = this.commitScheduledFuture;
if (commitScheduledFuture != null) {
commitScheduledFuture.cancel(false);
commitScheduledFuture = null;
}
}

private synchronized void flush() {
stopDeferredScheduledFuture();

synchronized (deferredLogs) {
if (!deferredLogs.isEmpty()) {
try {
sendMessage(gson.toJson(deferredLogs));
} catch (IOException e) {
}

deferredLogs.clear();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.openhab.core.io.websocket.log;

import java.util.Enumeration;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

Expand All @@ -23,6 +24,7 @@
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.log.LogEntry;
import org.osgi.service.log.LogReaderService;

import com.google.gson.Gson;
Expand Down Expand Up @@ -50,14 +52,14 @@ public void deactivate() {
webSockets.forEach(logReaderService::removeLogListener);
}

public void registerListener(LogWebSocket eventWebSocket) {
webSockets.add(eventWebSocket);
logReaderService.addLogListener(eventWebSocket);
public void registerListener(LogWebSocket logWebSocket) {
webSockets.add(logWebSocket);
logReaderService.addLogListener(logWebSocket);
}

public void unregisterListener(LogWebSocket eventWebSocket) {
logReaderService.removeLogListener(eventWebSocket);
webSockets.remove(eventWebSocket);
public void unregisterListener(LogWebSocket logWebSocket) {
logReaderService.removeLogListener(logWebSocket);
webSockets.remove(logWebSocket);
}

@Override
Expand All @@ -70,4 +72,8 @@ public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest,
ServletUpgradeResponse servletUpgradeResponse) {
return new LogWebSocket(gson, LogWebSocketAdapter.this);
}

public Enumeration<LogEntry> getLog() {
return logReaderService.getLog();
}
}