Skip to content

Commit

Permalink
Update logger to provide history and improve performance
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Jackson <[email protected]>
  • Loading branch information
cdjackson committed Jan 20, 2025
1 parent ce37425 commit 003b999
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,34 @@
import java.util.Date;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.io.websocket.event.EventDTO;
import org.osgi.service.log.LogLevel;

/**
* The {@link EventDTO} is used for serialization and deserialization of events
* The {@link LogDTO} is used for serialization and deserialization of log messages
*
* @author Jan N. Klug - Initial contribution
* @author Chris Jackson - Add sequence and make Comparable based on sequence
*/
@NonNullByDefault
public class LogDTO {
public @Nullable String loggerName;
public @Nullable LogLevel level;
public @Nullable Date timestamp;
public class LogDTO implements Comparable<LogDTO> {
public String loggerName;
public LogLevel level;
public Date timestamp;
public long unixtime;
public @Nullable String message;
public String message;
public long sequence;

public LogDTO(long sequence, String loggerName, LogLevel level, long unixtime, String message) {
this.sequence = sequence;
this.loggerName = loggerName;
this.level = level;
this.timestamp = new Date(unixtime);
this.unixtime = unixtime;
this.message = message;
}

@Override
public int compareTo(LogDTO o) {
return (int) (sequence - o.sequence);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.core.io.websocket.log;

import java.util.List;

/**
* The {@link LogFilterDTO} is used for serialization and deserialization of log filters
*
* @author Chris Jackson - Initial contribution
*/
public class LogFilterDTO {

public Long timeStart;
public Long timeStop;
public List<String> loggerNames;
public Long sequenceStart;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
package org.openhab.core.io.websocket.log;

import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.validation.constraints.Pattern;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
Expand All @@ -29,6 +33,7 @@
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.openhab.core.common.ThreadPoolManager;
import org.osgi.service.log.LogEntry;
import org.osgi.service.log.LogListener;
import org.slf4j.Logger;
Expand All @@ -38,10 +43,21 @@
import com.google.gson.JsonParseException;
import com.google.gson.reflect.TypeToken;

import antlr.collections.List;
import io.netty.util.concurrent.ScheduledFuture;

/**
* The {@link LogWebSocket} is the WebSocket implementation for logs
* The {@link LogWebSocket} is the WebSocket implementation for logs.
*
* This supports sending of history, and provides a method of managing message cadence.
* When a client connects, it must send a filter request before the server will send any logs. This triggers the sending
* of history.
*
* Live logs are sent as individual messages if they are received with sufficient spacing. When logs come in very
* quickly, they are clustered together and sent as an array after up to 100mS.
*
* @author Jan N. Klug - Initial contribution
* @author Chris Jackson - Add history and improve performance using arrays
*/
@WebSocket
@NonNullByDefault
Expand All @@ -51,26 +67,41 @@ public class LogWebSocket implements LogListener {
private static final TypeToken<List<String>> STRING_LIST_TYPE = (TypeToken<List<String>>) TypeToken
.getParameterized(List.class, String.class);

private final static int SEND_PERIOD = 100; // Minimum allowable time between log packets (in milliseconds)
private final static long FIRST_SEQUENCE = 0;

private final Logger logger = LoggerFactory.getLogger(LogWebSocket.class);

private final LogWebSocketAdapter wsAdapter;
private final Gson gson;

private @Nullable Session session;
private @Nullable RemoteEndpoint remoteEndpoint;
private String remoteIdentifier = "<unknown>";

private final ScheduledExecutorService scheduledExecutorService;
private @Nullable ScheduledFuture<?> commitScheduledFuture;

private long lastSentTime = 0;
private List<LogDTO> deferredLogs = new ArrayList<>();

private boolean enabled = false;
private long lastSequence = FIRST_SEQUENCE;

private List<Pattern> loggerPatterns = List.of();

public LogWebSocket(Gson gson, LogWebSocketAdapter wsAdapter) {
this.wsAdapter = wsAdapter;
this.gson = gson;

scheduledExecutorService = ThreadPoolManager.getScheduledPool("LogWebSocket");
}

@OnWebSocketClose
public void onClose(int statusCode, String reason) {
this.wsAdapter.unregisterListener(this);
remoteIdentifier = "<unknown>";
if (enabled) {
this.wsAdapter.unregisterListener(this);
}
stopDeferredScheduledFuture();
this.session = null;
this.remoteEndpoint = null;
}
Expand All @@ -80,23 +111,104 @@ public void onConnect(Session session) {
this.session = session;
RemoteEndpoint remoteEndpoint = session.getRemote();
this.remoteEndpoint = remoteEndpoint;
this.remoteIdentifier = remoteEndpoint.getInetSocketAddress().toString();
this.wsAdapter.registerListener(this);
}

@OnWebSocketMessage
public void onText(String message) {
// Detect empty message (keepalive) and ignore
if (message.equals("{}")) {
return;
}

// Defer sending live logs while we process the history
lastSentTime = Long.MAX_VALUE;
stopDeferredScheduledFuture();

// Enable log messages
if (!enabled) {
this.wsAdapter.registerListener(this);
enabled = true;
}

RemoteEndpoint remoteEndpoint = this.remoteEndpoint;
if (session == null || remoteEndpoint == null) {
// no connection or no remote endpoint , do nothing this is possible due to async behavior
return;
}

LogFilterDTO logFilterDto;
try {
loggerPatterns = gson.fromJson(message, STRING_LIST_TYPE).stream().map(Pattern::compile).toList();
logFilterDto = gson.fromJson(message, LogFilterDTO.class);
} catch (JsonParseException e) {
logger.warn("Failed to parse '{}' to a list of subscribed loggers", message);
logger.warn("Failed to parse '{}' to a valid log filter object", message);
return;
}

loggerPatterns = logFilterDto.loggerNames == null ? List.of()
: logFilterDto.loggerNames.stream().map(Pattern::compile).toList();

Long timeStart;
Long timeStop;
if (logFilterDto.timeStart != null) {
timeStart = logFilterDto.timeStart;
} else {
timeStart = Long.MIN_VALUE;
}
if (logFilterDto.timeStop != null) {
timeStop = logFilterDto.timeStop;
} else {
timeStop = Long.MAX_VALUE;
}

Long sequenceStart;
if (logFilterDto.sequenceStart != null) {
sequenceStart = logFilterDto.sequenceStart;
} else {
sequenceStart = lastSequence;
}

List<LogEntry> logs = new ArrayList<>();
for (Enumeration<LogEntry> history = wsAdapter.getLog(); history.hasMoreElements();) {
logs.add(history.nextElement());
}

if (logs.isEmpty()) {
lastSentTime = 0;
return;
}

Predicate<LogEntry> withinTimeRange = log -> (log.getTime() >= timeStart) && (log.getTime() <= timeStop);
Predicate<LogEntry> withinSequence = log -> log.getSequence() > sequenceStart;
Predicate<LogEntry> nameMatchesAnyPattern = log -> loggerPatterns.stream()
.anyMatch(pattern -> pattern.matcher(log.getLoggerName()).matches());

List<LogEntry> filteredEvents = logs.stream().filter(withinTimeRange.and(withinSequence))
.collect(Collectors.toList());
// List<LogEntry> filteredEvents = logs.stream().filter(withinTimeRange.and(nameMatchesAnyPattern))
// .collect(Collectors.toList());
List<LogDTO> dtoList = filteredEvents.stream().map(this::map).collect(Collectors.toList());
Collections.sort(dtoList);

try {
sendMessage(gson.toJson(dtoList));
} catch (IOException e) {
}
lastSentTime = System.currentTimeMillis();

// Remove any duplicates from the live log buffer
long newestSequence = logs.get(0).getSequence();
synchronized (deferredLogs) {
Iterator<LogDTO> iterator = deferredLogs.iterator();
while (iterator.hasNext()) {
LogDTO value = iterator.next();
if (value.sequence <= newestSequence) {
iterator.remove();
}
}
}

// Continue with live logs...
flush();
}

@OnWebSocketError
Expand All @@ -120,28 +232,64 @@ private synchronized void sendMessage(String message) throws IOException {

@Override
public void logged(@NonNullByDefault({}) LogEntry logEntry) {
if (!loggerPatterns.isEmpty() && loggerPatterns.stream().noneMatch(logMatch(logEntry))) {
if (!loggerPatterns.isEmpty() && loggerPatterns.stream().noneMatch(logPatternMatch(logEntry))) {
return;
}
try {
LogDTO logDTO = map(logEntry);
sendMessage(gson.toJson(logDTO));
} catch (IOException e) {
// Fail silently!

LogDTO logDTO = map(logEntry);
lastSequence = logEntry.getSequence();

// If the last message sent was less than SEND_PERIOD ago, then we just buffer
if (lastSentTime > System.currentTimeMillis() - SEND_PERIOD) {
// Start the timer if this is the first deferred log
synchronized (deferredLogs) {
if (deferredLogs.isEmpty()) {
commitScheduledFuture = scheduledExecutorService.schedule(this::flush,
lastSentTime + SEND_PERIOD - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

deferredLogs.add(logDTO);
}
} else {
lastSentTime = System.currentTimeMillis();
try {
sendMessage(gson.toJson(logDTO));
} catch (IOException e) {
// Fail silently!
}
}
}

private static Predicate<Pattern> logMatch(LogEntry logEntry) {
private static Predicate<Pattern> logPatternMatch(LogEntry logEntry) {
return pattern -> pattern.matcher(logEntry.getLoggerName()).matches();
}

private static LogDTO map(LogEntry logEntry) {
LogDTO logDTO = new LogDTO();
logDTO.loggerName = logEntry.getLoggerName();
logDTO.level = logEntry.getLogLevel();
logDTO.unixtime = logEntry.getTime();
logDTO.timestamp = new Date(logEntry.getTime());
logDTO.message = logEntry.getMessage();
return logDTO;
private LogDTO map(LogEntry logEntry) {
return new LogDTO(logEntry.getSequence(), logEntry.getLoggerName(), logEntry.getLogLevel(), logEntry.getTime(),
logEntry.getMessage());
}

private void stopDeferredScheduledFuture() {
// Stop any existing scheduled commit
ScheduledFuture<?> commitScheduledFuture = this.commitScheduledFuture;
if (commitScheduledFuture != null) {
commitScheduledFuture.cancel(false);
commitScheduledFuture = null;
}
}

private synchronized void flush() {
stopDeferredScheduledFuture();

synchronized (deferredLogs) {
if (!deferredLogs.isEmpty()) {
try {
sendMessage(gson.toJson(deferredLogs));
} catch (IOException e) {
}

deferredLogs.clear();
}
}
}
}
Loading

0 comments on commit 003b999

Please sign in to comment.