diff --git a/.gitignore b/.gitignore index c2bdca8..a760e43 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,27 @@ -# Misc -.DS_Store - # Gradle -.gradle -build +**/.gradle +**/build + +# Misc +**.bak +**.class +**.iml +**.old +**/.classpath +**/.DS_Store +**/.idea +**/.metadata +**/.project +**/.settings +**/eclipseBin +**/libJar +**/out +**/target +**/bin -# Project files -*.iml -.idea -/bin/ +# Runtime files +**/.key +db +dslink.properties +nodes.json +nodes.json.bak diff --git a/build.gradle b/build.gradle index 03fca10..d375777 100644 --- a/build.gradle +++ b/build.gradle @@ -4,25 +4,22 @@ apply plugin: 'java-library' mainClassName = 'org.dsa.iot.etsdb.Main' sourceCompatibility = 1.7 targetCompatibility = 1.7 -version = '0.19.6' +version = '0.19.7' repositories { mavenLocal() jcenter() - maven { - url 'https://oss.sonatype.org/content/repositories/snapshots/' - } } wrapper { - gradleVersion = '6.1' + gradleVersion = '6.2.2' } dependencies { compileOnly 'com.google.code.findbugs:annotations:[3.0.1,)' - api 'org.iot-dsa:commons:0.23.1' - api 'org.iot-dsa:dslink:0.23.1' - api 'org.iot-dsa:historian:0.23.1' + api 'org.iot-dsa:commons:0.23.2' + api 'org.iot-dsa:dslink:0.23.2' + api 'org.iot-dsa:historian:0.23.2' } run { diff --git a/dslink.json b/dslink.json index 1c5ae4e..3565e55 100644 --- a/dslink.json +++ b/dslink.json @@ -1,11 +1,10 @@ { "name": "dslink-java-etsdb", - "version": "0.19.6", + "version": "0.19.7", "description": "Historian DSLink implementation for ETSDB", "license": "Apache", "author": { - "name": "Samuel Grenier", - "email": "samrg472@gmail.com" + "name": "Samuel Grenier" }, "main": "bin/dslink-java-etsdb", "repository": { diff --git a/src/main/java/org/etsdb/impl/ChecksumInputStream.java b/src/main/java/org/etsdb/impl/ChecksumInputStream.java index 7994972..06818ff 100644 --- a/src/main/java/org/etsdb/impl/ChecksumInputStream.java +++ b/src/main/java/org/etsdb/impl/ChecksumInputStream.java @@ -1,8 +1,14 @@ package org.etsdb.impl; -import java.io.*; +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; class ChecksumInputStream extends InputStream implements ChecksumInput { + private final InputStream delegate; private byte sum; @@ -15,7 +21,7 @@ class ChecksumInputStream extends InputStream implements ChecksumInput { ChecksumInputStream(File dataFile) { if (dataFile.exists()) { try { - this.delegate = new BufferedInputStream(new FileInputStream(dataFile)); + this.delegate = new BufferedInputStream(new FileInputStream(dataFile), 32768); } catch (FileNotFoundException e) { // This should not happen because we just checked that the file exists. throw new RuntimeException(e); @@ -28,8 +34,9 @@ class ChecksumInputStream extends InputStream implements ChecksumInput { @Override public boolean checkSum() throws IOException { - if (eof) + if (eof) { return false; + } byte b = (byte) delegate.read(); position++; @@ -40,13 +47,14 @@ public boolean checkSum() throws IOException { @Override public int read() throws IOException { - if (eof) + if (eof) { return -1; + } int i = delegate.read(); - if (i == -1) + if (i == -1) { eof = true; - else { + } else { position++; sum += (byte) i; } @@ -56,16 +64,18 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { - if (eof) + if (eof) { return -1; + } int count = delegate.read(b, off, len); - if (count == -1) + if (count == -1) { eof = true; - else { + } else { position += count; - for (int i = 0; i < count; i++) + for (int i = 0; i < count; i++) { sum += b[i + off]; + } } return count; } @@ -116,7 +126,8 @@ public long skip(long n) throws IOException { @Override public void close() throws IOException { - if (delegate != null) + if (delegate != null) { delegate.close(); + } } } diff --git a/src/main/java/org/etsdb/impl/CorruptionScanner.java b/src/main/java/org/etsdb/impl/CorruptionScanner.java index e37f2b4..b79bd06 100644 --- a/src/main/java/org/etsdb/impl/CorruptionScanner.java +++ b/src/main/java/org/etsdb/impl/CorruptionScanner.java @@ -1,18 +1,27 @@ package org.etsdb.impl; +import org.dsa.iot.shared.SharedObjects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; class CorruptionScanner { + private static final List emptyList = Collections.emptyList(); private static final Logger logger = LoggerFactory.getLogger(CorruptionScanner.class.getName()); private final DatabaseImpl db; + private final AtomicInteger threads = new AtomicInteger(0); CorruptionScanner(DatabaseImpl db) { this.db = db; @@ -31,11 +40,11 @@ public static void main(String[] args) throws IOException { } } - void scan() throws IOException { + void scan() { scan(db.getBaseDir()); } - private void scan(File parent) throws IOException { + private void scan(File parent) { File[] subdirs = parent.listFiles(); if (subdirs != null) { for (File subdir : subdirs) { @@ -48,9 +57,20 @@ private void scan(File parent) throws IOException { } } } + synchronized (this) { + while (threads.get() > 0) { + try { + wait(1000); + } catch (InterruptedException ignore) { + } + } + } } - private void deepScan(File parent) throws IOException { + private void deepScan(File parent) { + if (!parent.isDirectory()) { + return; + } File[] subdirs = parent.listFiles(); if (subdirs != null) { for (File subdir : subdirs) { @@ -60,80 +80,117 @@ private void deepScan(File parent) throws IOException { } } - private void checkSeriesDir(File seriesDir) throws IOException { + private void checkSeriesDir(final File seriesDir) { if (!seriesDir.isDirectory()) { return; } - String seriesId = seriesDir.getName(); + File[] files = seriesDir.listFiles(); + if ((files == null) || (files.length == 0)) { + return; + } + final List temps = getFiles(files, ".temp"); + final List datas = getFiles(files, ".data"); + final List metas = getFiles(files, ".meta"); + if (temps.isEmpty() && datas.isEmpty() && metas.isEmpty()) { + return; + } + threads.incrementAndGet(); + SharedObjects.getDaemonThreadPool().execute(new Runnable() { + @Override + public void run() { + try { + checkSeriesDir(seriesDir, temps, datas, metas); + } catch (Exception x) { + logger.error(seriesDir.getPath(), x); + } finally { + threads.decrementAndGet(); + synchronized (CorruptionScanner.this) { + CorruptionScanner.this.notify(); + } + } + } + }); + } + private void checkSeriesDir(File seriesDir, + List temps, + List datas, + List metas) throws IOException { // temp files. - for (File temp : getFiles(seriesDir, ".temp")) { - long shardId = Utils.getShardId(temp.getName(), 10); - File data = new File(seriesDir, shardId + ".data"); - File meta = new File(seriesDir, shardId + ".meta"); - - if (data.exists()) { - // If the data file exists, then just delete the file - logger.warn("Found temp file " + temp + " with existing data file. Deleting."); - Utils.deleteWithRetry(temp); - } else if (meta.exists()) { - // If the meta file exists, then rename the temp file to data, and delete the meta file so that it gets - // recreated. - logger.warn("Found temp file " + temp + " without data but with meta file. Moving."); - Utils.renameWithRetry(temp, data); - Utils.deleteWithRetry(meta); - } else if (temp.length() > 0) { - // A lonely temp file, but with content. Rename to data and see wht the corruption check has to say. - logger.warn("Found temp file " + temp + " without data or meta file, with content. Moving."); - Utils.deleteWithRetry(temp); - } else { - // Otherwise, just delete the temp file. - logger.warn("Found temp file " + temp + " without data, meta file, or content. Deleting."); - Utils.deleteWithRetry(temp); + if (!temps.isEmpty()) { + for (File temp : temps) { + long shardId = Utils.getShardId(temp.getName(), 10); + File data = new File(seriesDir, shardId + ".data"); + File meta = new File(seriesDir, shardId + ".meta"); + + if (data.exists()) { + // If the data file exists, then just delete the file + logger.warn("Found temp file " + temp + " with existing data file. Deleting."); + Utils.deleteWithRetry(temp); + } else if (meta.exists()) { + // If the meta file exists, then rename the temp file to data, and delete the meta file so that it gets + // recreated. + logger.warn( + "Found temp file " + temp + + " without data but with meta file. Moving."); + Utils.renameWithRetry(temp, data); + Utils.deleteWithRetry(meta); + } else { + // Otherwise, just delete the temp file. + logger.warn("Found temp file " + temp + + " without data, meta file, or content. Deleting."); + Utils.deleteWithRetry(temp); + } } } - // Ensure there is a meta file for every data file and vice versa. - List datas = getFiles(seriesDir, ".data"); - List metas = getFiles(seriesDir, ".meta"); - - for (File data : datas) { - long shardId = Utils.getShardId(data.getName()); - boolean found = false; - for (int i = metas.size() - 1; i >= 0; i--) { - File meta = metas.get(i); - if (Utils.getShardId(meta.getName()) == shardId) { - metas.remove(i); - found = true; - break; + if (!datas.isEmpty()) { + for (File data : datas) { + long shardId = Utils.getShardId(data.getName()); + boolean found = false; + for (int i = metas.size() - 1; i >= 0; i--) { + File meta = metas.get(i); + if (Utils.getShardId(meta.getName()) == shardId) { + metas.remove(i); + found = true; + break; + } } - } - if (!found) { - logger.warn("Data file without meta file in series " + seriesId + ", shard " + shardId + "."); - // Don't need to recreate the meta file here. The - // DataShard shard = new DataShard(seriesDir, seriesId, shardId); - // shard.close(); + if (!found) { + logger.warn("Data file without meta file in series " + seriesDir.getName() + + ", shard " + shardId + "."); + } } } // If there are any files left in the meta list, then they should just be deleted. - for (File meta : metas) { - logger.warn("Meta file without data file at " + meta + ". Deleting file"); - Utils.deleteWithRetry(meta); + if (!metas.isEmpty()) { + for (File meta : metas) { + logger.warn("Meta file without data file at " + meta + ". Deleting file"); + Utils.deleteWithRetry(meta); + } } - // Check data files for corruption. - for (File data : datas) { - checkFile(data); + if (!datas.isEmpty()) { + for (File data : datas) { + checkFile(data); + } } } - private List getFiles(File dir, String suffix) { - List result = new ArrayList<>(); - File[] files = dir.listFiles(new SuffixFilter(suffix)); - if (files != null) { - Collections.addAll(result, files); + private List getFiles(File[] files, String suffix) { + List result = null; + for (File file : files) { + if (!file.isDirectory() && file.getName().endsWith(suffix)) { + if (result == null) { + result = new ArrayList<>(); + } + result.add(file); + } + } + if (result == null) { + return emptyList; } return result; } @@ -148,7 +205,8 @@ private void checkFile(File data) throws IOException { } // If any corruption was found, delete the meta file so that it gets recreated. - Utils.deleteWithRetry(new File(data.getParent(), Utils.getShardId(data.getName()) + ".meta")); + Utils.deleteWithRetry( + new File(data.getParent(), Utils.getShardId(data.getName()) + ".meta")); logger.warn("Corruption detected in " + data + " at position " + position); fixCorruption(data, position); @@ -291,7 +349,8 @@ private void cut(File data, long from, long to) throws IOException { Utils.renameWithRetry(temp, data); } - private void copy(InputStream in, OutputStream out, long length, byte[] buf) throws IOException { + private void copy(InputStream in, OutputStream out, long length, byte[] buf) + throws IOException { while (length > 0) { int chunk = buf.length; if (length < buf.length) { @@ -306,17 +365,4 @@ private void copy(InputStream in, OutputStream out, long length, byte[] buf) thr } } - static class SuffixFilter implements FilenameFilter { - - private final String suffix; - - SuffixFilter(String suffix) { - this.suffix = suffix; - } - - @Override - public boolean accept(File dir, String name) { - return name.endsWith(suffix); - } - } } diff --git a/src/main/java/org/etsdb/impl/DatabaseImpl.java b/src/main/java/org/etsdb/impl/DatabaseImpl.java index b5d6288..aba86ed 100644 --- a/src/main/java/org/etsdb/impl/DatabaseImpl.java +++ b/src/main/java/org/etsdb/impl/DatabaseImpl.java @@ -4,7 +4,13 @@ import org.dsa.iot.dslink.util.handler.Handler; import org.dsa.iot.etsdb.utils.atomic.NotifyAtomicInteger; import org.dsa.iot.etsdb.utils.atomic.NotifyAtomicLong; -import org.etsdb.*; +import org.etsdb.ByteArrayBuilder; +import org.etsdb.Database; +import org.etsdb.DbConfig; +import org.etsdb.EtsdbException; +import org.etsdb.QueryCallback; +import org.etsdb.Serializer; +import org.etsdb.TimeRange; import org.etsdb.util.DirectoryUtils; import org.etsdb.util.EventHistogram; import org.slf4j.Logger; @@ -12,13 +18,18 @@ import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** - * * @param the class of data that is written to this database. * @author Matthew Lohbihler */ @@ -122,13 +133,9 @@ private void open() { DBProperties props = getProperties(); if (!props.getBoolean("clean", false)) { if (config.isRunCorruptionScan()) { - try { - long start = System.currentTimeMillis(); - new CorruptionScanner(this).scan(); - logger.info("Corruption scan took " + (System.currentTimeMillis() - start) + "ms"); - } catch (IOException e) { - throw new EtsdbException(e); - } + long start = System.currentTimeMillis(); + new CorruptionScanner(this).scan(); + logger.info("Corruption scan took " + (System.currentTimeMillis() - start) + "ms"); } } else { if (config.isRunCorruptionScan()) { @@ -243,12 +250,14 @@ public void query(String seriesId, long fromTs, long toTs, final QueryCallback cb) { + public void query(String seriesId, long fromTs, long toTs, int limit, + final QueryCallback cb) { query(seriesId, fromTs, toTs, limit, false, cb); } @Override - public void query(String seriesId, long fromTs, long toTs, int limit, boolean reverse, final QueryCallback cb) { + public void query(String seriesId, long fromTs, long toTs, int limit, boolean reverse, + final QueryCallback cb) { lockConcurrent(); try { Series series = getSeries(seriesId); diff --git a/src/main/java/org/etsdb/impl/Janitor.java b/src/main/java/org/etsdb/impl/Janitor.java index 99bad1e..5fa9977 100644 --- a/src/main/java/org/etsdb/impl/Janitor.java +++ b/src/main/java/org/etsdb/impl/Janitor.java @@ -8,6 +8,7 @@ class Janitor implements Runnable { + static final Logger logger = LoggerFactory.getLogger(Janitor.class.getName()); private Handler handler; @@ -55,7 +56,7 @@ void initiate() { running = true; - thread = new Thread(this, "ETSDB Maintenance"); + thread = new Thread(this, "Maintenance " + db.getBaseDir().getPath()); //thread.setDaemon(true); thread.setPriority(Thread.MAX_PRIORITY - 1); thread.start(); @@ -80,8 +81,9 @@ public void run() { private void runImpl() { long next = nextFileLockCheck; - if (next > nextFlush) + if (next > nextFlush) { next = nextFlush; + } long sleep = next - System.currentTimeMillis(); if (sleep > 0) { @@ -96,8 +98,9 @@ private void runImpl() { } } - if (!running) + if (!running) { return; + } long now = System.currentTimeMillis(); if (now >= nextFileLockCheck) { @@ -114,8 +117,10 @@ private void runImpl() { // A GC is required for the mapped buffers to be closed. if (running) { if (db.tooManyFiles() || fileClosures > db.maxOpenFiles / 2) { - if (logger.isDebugEnabled()) - logger.debug("Running garbage collection. Files to close: " + fileClosures); + if (logger.isDebugEnabled()) { + logger.debug( + "Running garbage collection. Files to close: " + fileClosures); + } System.gc(); gc = true; db.openFiles.addAndGet(-fileClosures); @@ -135,8 +140,10 @@ private void runImpl() { if (logger.isDebugEnabled()) { logger.debug("Write queue flush took " + time + " ms"); - logger.debug("write/s=" + db.getWritesPerSecond() + ", backdateCount=" + db.getBackdateCount() - + ", writeCount=" + db.getWriteCount() + ", openFiles=" + db.getOpenFiles() + ", forcedClose=" + logger.debug("write/s=" + db.getWritesPerSecond() + ", backdateCount=" + + db.getBackdateCount() + + ", writeCount=" + db.getWriteCount() + ", openFiles=" + + db.getOpenFiles() + ", forcedClose=" + db.getForcedClose()); } @@ -145,10 +152,11 @@ private void runImpl() { // sleep time exceed the flush interval * 4. time *= 10; - if (gc || time < flushInterval) + if (gc || time < flushInterval) { time = flushInterval; - else if (time > flushInterval * 4) + } else if (time > flushInterval * 4) { time = flushInterval * 4; + } nextFlush = System.currentTimeMillis() + time; } } diff --git a/src/main/java/org/etsdb/impl/Utils.java b/src/main/java/org/etsdb/impl/Utils.java index 729f46c..a8dd2c5 100644 --- a/src/main/java/org/etsdb/impl/Utils.java +++ b/src/main/java/org/etsdb/impl/Utils.java @@ -3,11 +3,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.text.SimpleDateFormat; import java.util.Date; public class Utils { + // This value must have at least one byte. static final byte[] SAMPLE_HEADER = {(byte) 0xfe, (byte) 0xed}; @@ -35,8 +40,9 @@ public static File getSeriesDir(File baseDir, String seriesId) { */ public static long getShardDirectory(String seriesId) { int i = seriesId.hashCode(); - if (i < 0) + if (i < 0) { i = -i; + } return i % 100; } @@ -62,15 +68,17 @@ public static long getShardId(String filename, int suffixLength) { * Returns the offset of the given ts within the given shard. * * @param shardId ID of the shard - * @param ts Timestamp + * @param ts Timestamp * @return offset */ public static long getOffsetInShard(long shardId, long ts) { long tsShardId = getShardId(ts); - if (tsShardId < shardId) + if (tsShardId < shardId) { return 0; - if (tsShardId == shardId) + } + if (tsShardId == shardId) { return getSampleOffset(ts); + } return 0x40000000; } @@ -90,8 +98,9 @@ public static long getTimestamp(long shardFile, long offset) { public static void closeQuietly(Closeable c) { try { - if (c != null) + if (c != null) { c.close(); + } } catch (IOException e) { logger.warn("Exception during close", e); } @@ -110,8 +119,9 @@ public static void writeCompactInt(OutputStream out, int i) throws IOException { } public static void putCompactLong(OutputStream out, long l) throws IOException { - if (l < 0) + if (l < 0) { throw new IllegalArgumentException("Cannot store negative numbers"); + } while (true) { if (l < 128) { out.write((byte) l); @@ -131,11 +141,13 @@ public static long readCompactLong(Input in) throws IOException { int count = 0; while (true) { long l = in.read(); - if (l == -1) + if (l == -1) { throw new IOException("EOF"); + } result |= (l & 0x7f) << (count++ * 7); - if (l < 128) + if (l < 128) { break; + } } return result; } @@ -148,8 +160,9 @@ public static boolean skip(InputStream in, long n) throws IOException { in.mark(1); int i = in.read(); in.reset(); - if (i == -1) + if (i == -1) { return true; + } } n -= skipped; } @@ -158,20 +171,24 @@ public static boolean skip(InputStream in, long n) throws IOException { } public static void deleteWithRetry(File file) throws IOException { - if (!file.exists()) + if (!file.exists()) { return; + } int retries = FILE_IO_RETRIES; while (true) { - if (file.delete()) + if (file.delete()) { break; + } - if (retries == 0) + if (retries == 0) { throw new IOException("Failed to delete " + file); + } retries--; - if (logger.isDebugEnabled()) + if (logger.isDebugEnabled()) { logger.debug("Failed to delete " + file + ", " + retries + " retries left"); + } sleep(FILE_IO_RETRIES - retries); } } @@ -179,21 +196,26 @@ public static void deleteWithRetry(File file) throws IOException { public static void renameWithRetry(File from, File to) throws IOException { int retries = FILE_IO_RETRIES; while (true) { - if (from.renameTo(to)) + if (from.renameTo(to)) { break; - if (retries == 0) + } + if (retries == 0) { throw new IOException("Failed to rename " + from + " to " + to); + } retries--; - if (logger.isDebugEnabled()) - logger.debug("Failed to rename " + from + " to " + to + ", " + retries + " retries left"); + if (logger.isDebugEnabled()) { + logger.debug("Failed to rename " + from + " to " + to + ", " + retries + + " retries left"); + } sleep(FILE_IO_RETRIES - retries); } } public static void delete(File file) throws IOException { - if (!file.exists()) + if (!file.exists()) { return; + } String message = null; while (true) { @@ -202,14 +224,16 @@ public static void delete(File file) throws IOException { break; } catch (IOException e) { if (e.getMessage() != null) { - if (message == null) + if (message == null) { message = e.getMessage(); - else { - if (e.getMessage().equals(message)) + } else { + if (e.getMessage().equals(message)) { throw e; + } } - } else + } else { throw e; + } } } } @@ -224,31 +248,35 @@ private static void _delete(File file) throws IOException { } } - if (!file.delete()) + if (!file.delete()) { throw new IOException("Failed to delete " + file); + } } - public static void deleteEmptyDirs(File file) { + /** + * Returns true if the given directory was deleted. + */ + public static boolean deleteEmptyDirs(File file) { if (file.isDirectory()) { // Recurse to leaves. File[] files = file.listFiles(); + boolean empty = true; if (files != null && files.length > 0) { - boolean hasFiles = false; for (File subFile : files) { - if (!subFile.isDirectory()) - hasFiles = true; - deleteEmptyDirs(subFile); + if (!deleteEmptyDirs(subFile)) { + empty = false; + } } - if (!hasFiles) - files = file.listFiles(); } - - if (files == null || files.length == 0) { + if (empty) { if (!file.delete()) { logger.error("Failed to delete empty dir: {}", file.getPath()); + return false; } + return true; } } + return false; } public static void write4ByteUnsigned(OutputStream out, long l) throws IOException { @@ -269,18 +297,21 @@ public static long read4ByteUnsigned(Input in) throws IOException { public static void sleep(int time) { try { - if (time > 0) + if (time > 0) { Thread.sleep(time); + } } catch (InterruptedException e) { // Ignore } } public static int compareLong(long l1, long l2) { - if (l1 < l2) + if (l1 < l2) { return -1; - if (l1 == l2) + } + if (l1 == l2) { return 0; + } return 1; }