diff --git a/artemis-journal/pom.xml b/artemis-journal/pom.xml index f31e8b66726..79c17eeaf44 100644 --- a/artemis-journal/pom.xml +++ b/artemis-journal/pom.xml @@ -90,5 +90,37 @@ ${project.version} test + + + com.seeburger.common + storage-client + 1.106.0 + provided + + + com.seeburger.common + storage-common + 1.106.0 + provided + + + + vfs-client-api + com.seeburger.vfs + 1.79.0 + provided + + + recovery + com.seeburger.bis.base + 1.25.9 + provided + + + auth + com.seeburger.security + 1.21.12 + provided + diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/datastore/DataStoreSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/datastore/DataStoreSequentialFile.java new file mode 100644 index 00000000000..eb87d4fdd2b --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/datastore/DataStoreSequentialFile.java @@ -0,0 +1,668 @@ +/* + * DataStoreSequentialFile.java + * + * created at 2023-07-17 by f.gervasi f.gervasi@seeburger.de + * + * Copyright (c) SEEBURGER AG, Germany. All Rights Reserved. + */ +package org.apache.activemq.artemis.core.io.datastore; + + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.invoke.MethodHandles; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.io.DummyCallback; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.seeburger.storage.api.Storage; +import com.seeburger.storage.api.StorageException; +import com.seeburger.storage.api.StorageFile; +import com.seeburger.storage.service.StorageService; + + +public class DataStoreSequentialFile implements SequentialFile +{ + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private Storage storage; + private StorageFile storageFile; + private TimedBuffer timedBuffer; + private AtomicBoolean isOpen = new AtomicBoolean(false); + private AtomicLong readPosition = new AtomicLong(0); + private final Object tempDataSemaphore = new Object(); + private boolean isAlreadyLookingForStorage = false; + private OutputStream storageFileOut; + private InputStream storageFileIn; + private ByteArrayOutputStream tempData; + private int bytesReceived; + private int syncCallCounter; + private List filesToBeRemoved = new ArrayList<>(); + private Path fileSizeFile; + private String fileName; + private String fileId; + private DataStoreSequentialFileFactory factory; + + public DataStoreSequentialFile(String fileName, String fileId, DataStoreSequentialFileFactory factory) + { + this.fileName = fileName; + this.fileId = fileId; + this.factory = factory; + createFileSizeFile(); + if (parseFileSizeFile().get(fileId) == null) + { + writeSizeToFileSizeFile(0); + } + } + + + @Override + public boolean isOpen() + { + return isOpen.get(); + } + + + @Override + public boolean exists() + { + try + { + return storageFile != null && storage.isFileExist(storageFile.getId(), false); + } + catch (StorageException e) + { + LOG.error("Failed to check whether file exists or not.", e); + } + return false; + } + + + @Override + public void open() + throws Exception + { + open(1, false); + } + + + @Override + public void open(int maxIO, boolean useExecutor) + throws Exception + { + LOG.info("Opened file {}", fileId); + getStorage(); + if (storage != null) + { + if (!storage.isFileExist(fileId, false)) + { + storageFile = storage.createFile(fileId); + } + storageFileIn = storage.getInputStream(storageFile.getId()); + storageFileOut = storage.getOutputStream(storageFile); + } + else + { + if (tempData == null) + { + tempData = new ByteArrayOutputStream(); + } + } + isOpen.set(true); + } + + + @Override + public ByteBuffer map(int position, long size) + throws IOException + { + throw new UnsupportedOperationException("Cannot map file."); + } + + + @Override + public boolean fits(int size) + { + return true; + } + + + @Override + public int calculateBlockStart(int position) + throws Exception + { + return position; + } + + + @Override + public String getFileName() + { + return fileName; + } + + + @Override + public void fill(int size) + throws Exception + { + // we do not need to fill anything + } + + + @Override + public void delete() + throws IOException, InterruptedException, ActiveMQException + { + if (storage == null) + { + filesToBeRemoved.add(fileId); + } + else + { + storage.removeFile(storageFile.getId()); + } + factory.deleteFileName(fileId); + try + { + if (isOpen()) + { + close(); + } + } + catch (Exception e) + { + LOG.error("Error deleting file.", e); + } + } + + + @Override + public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) + throws Exception + { + if (timedBuffer != null) + { + bytes.setIndex(0, bytes.capacity()); + timedBuffer.addBytes(bytes, sync, callback); + } + else + { + int readableBytes = bytes.readableBytes(); + ByteBuffer buffer = factory.newBuffer(readableBytes); + bytes.getBytes(bytes.readerIndex(), buffer); + buffer.flip(); + writeDirect(buffer, sync, callback); + } + } + + + @Override + public void write(ActiveMQBuffer bytes, boolean sync) + throws Exception + { + if (sync) + { + SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); + write(bytes, sync, completion); + completion.waitCompletion(); + } + else + { + write(bytes, sync, DummyCallback.getInstance()); + } + } + + + @Override + public void write(EncodingSupport bytes, boolean sync, IOCallback callback) + throws Exception + { + if (timedBuffer != null) + { + timedBuffer.addBytes(bytes, sync, callback); + } + else + { + int encodedSize = bytes.getEncodeSize(); + ByteBuffer buffer = factory.newBuffer(encodedSize); + ActiveMQBuffer outBuffer = ActiveMQBuffers.wrappedBuffer(buffer); + bytes.encode(outBuffer); + buffer.clear(); + buffer.limit(encodedSize); + writeDirect(buffer, sync, callback); + } + } + + + @Override + public void write(EncodingSupport bytes, boolean sync) + throws Exception + { + if (sync) + { + SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); + write(bytes, sync, completion); + completion.waitCompletion(); + } + else + { + write(bytes, sync, DummyCallback.getInstance()); + } + } + + + @Override + public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) + { + internalWrite(bytes, sync, callback, true); + } + + + @Override + public void writeDirect(ByteBuffer bytes, boolean sync) + throws Exception + { + writeDirect(bytes, sync, DummyCallback.getInstance()); + } + + + @Override + public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) + throws Exception + { + internalWrite(bytes, sync, null, releaseBuffer); + } + + + @Override + public int read(ByteBuffer bytes, IOCallback callback) + throws Exception + { + int bytesRead = 0; + if (bytes.hasArray()) + { + bytesRead = storageFileIn.read(bytes.array(), bytes.position() + bytes.arrayOffset(), bytes.remaining()); + readPosition.addAndGet(bytesRead); + bytes.position(bytes.position() + bytesRead); + bytes.flip(); + } + callback.done(); + return bytesRead; + } + + + @Override + public int read(ByteBuffer bytes) + throws Exception + { + return read(bytes, DummyCallback.getInstance()); + } + + + @Override + public void position(long pos) + throws IOException + { + readPosition.set(pos); + } + + + @Override + public long position() + { + return readPosition.get(); + } + + + @Override + public void close() + throws Exception + { + if (tempData == null) + { + LOG.info("Closed file {}", fileId); + writeCurrentSizeToFileSizeFile(); + storageFileOut.flush(); + storageFileIn.close(); + storageFileOut.close(); + storageFile = null; + storageFileIn = null; + storageFileOut = null; + } + position(0); + isOpen.set(false); + } + + + @Override + public void sync() + throws IOException + { + measure("sync", () -> + { + if ( ++syncCallCounter % 100 == 0) + { + LOG.info("Called sync for the {}th time", syncCallCounter); + LOG.info("Bytes received so far: {}", bytesReceived); + } + storageFileOut.flush(); +// storageFileOut.close(); +// storageFile = storage.getFile(fileId); +// storageFileOut = storage.getOutputStream(storageFile); + return null; + }); + } + + + @Override + public long size() + throws Exception + { + return storageFile != null ? storageFile.getSize() + bytesReceived : readFileSizeFromFile(); + } + + + @Override + public void renameTo(String newFileName) + throws Exception + { + factory.changeFileName(newFileName, fileId); + if (tempData == null) + { + InputStream in = storage.getInputStream(storageFile.getId()); + byte[] content = in.readAllBytes(); + in.close(); + StorageFile newFile = storage.createFile(newFileName); + OutputStream out = storage.getOutputStream(newFile); + out.write(content); + out.close(); + storage.removeFile(storageFile.getId()); + storageFile = newFile; + } + else + { + fileName = newFileName; + } + } + + + @Override + public SequentialFile cloneFile() + { + return new DataStoreSequentialFile(fileName, storageFile.getId(), factory); + } + + + @Override + public void copyTo(SequentialFile newFileName) + throws Exception + { + InputStream in = storage.getInputStream(storageFile.getId()); + byte[] content = in.readAllBytes(); + in.close(); + StorageFile newFile = storage.getFile(newFileName.getFileName()); + OutputStream out = storage.getOutputStream(newFile); + out.write(content); + out.close(); + } + + + @Override + public void setTimedBuffer(TimedBuffer buffer) + { + timedBuffer = buffer; + } + + + @Override + public File getJavaFile() + { + return null; + } + + private Runnable checkForStorageService = () -> + { + while (storage == null) + { + try + { + storage = StorageService.getStorage(); + LOG.info("Success! Storage is now active. Will write any temporary data to data store."); + isAlreadyLookingForStorage = false; + if (tempData != null) + { + for (String s : filesToBeRemoved) + { + storage.removeFile(s); + } + synchronized (tempDataSemaphore) + { + open(); + byte[] data = tempData.toByteArray(); + tempData.close(); + tempData = null; + internalWrite(ByteBuffer.wrap(data), true, null, true); + } + } + } + catch (StorageException e) + { + LOG.warn("Failed to get storage. Will try again. Error msg: {}", e.getMessage()); + try + { + Thread.sleep(3000); + } + catch (InterruptedException e1) + { + Thread.currentThread().interrupt(); + } + } + catch (Exception e) + { + LOG.error("Failed to open file.", e); + } + } + }; + + private void getStorage() + { + if (storage != null || isAlreadyLookingForStorage) + { + return; + } + try + { + storage = StorageService.getStorage(); + } + catch (StorageException e) + { + LOG.error("Failed to get Storage initially. Will try in the background."); + isAlreadyLookingForStorage = true; + new Thread(checkForStorageService).start(); + } + } + + + private void internalWrite(ByteBuffer bytes, boolean sync, IOCallback callback, boolean releaseBuffer) + { + try + { + if (bytes.hasArray()) + { + if (tempData == null) + { + bytesReceived += bytes.remaining(); + storageFileOut.write(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()); + bytes.position(bytes.limit()); + } + else + { + synchronized (tempDataSemaphore) + { + tempData.write(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()); + } + } + } + + if (/*sync &&*/ tempData == null && bytesReceived % 9000 == 0 && bytesReceived > 0) // will never flush the remaining bytes + { + sync(); + } + + if (callback != null) + { + callback.done(); + } + } + catch (IOException e) + { + LOG.error("Error writing to output stream.", e); + } + finally + { + if (releaseBuffer) + { + factory.releaseBuffer(bytes); + } + } + } + + + private T measure(String message, Callable action) + throws RuntimeException + { + long startTime = System.currentTimeMillis(); + try + { + T result = action.call(); + return result; + } + catch (Exception e) + { + throw new RuntimeException(e); + } + finally + { + long delta = System.currentTimeMillis() - startTime; + if (delta > 20) + { + LOG.warn("{} took {}ms", message, delta); + } + } + } + + + private void createFileSizeFile() + { + fileSizeFile = Paths.get(System.getProperty("bisas.temp"), "file-size-file.txt"); + if (Files.notExists(fileSizeFile)) + { + try + { + Files.createFile(fileSizeFile); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + } + + + private void writeCurrentSizeToFileSizeFile() + { + updateValue(storageFile.getSize() + bytesReceived); + } + + + private void writeSizeToFileSizeFile(long size) + { + updateValue(size); + } + + + private void updateValue(long size) + { + Map fileSizes = parseFileSizeFile(); + fileSizes.put(fileId, size); + try + { + Files.delete(fileSizeFile); + createFileSizeFile(); + fileSizes.entrySet().stream().forEach(entry -> + { + try + { + Files.writeString(fileSizeFile, String.format("%s:%d%n", entry.getKey(), entry.getValue()), StandardOpenOption.APPEND); + } + catch (IOException e) + { + e.printStackTrace(); + } + }); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + + private long readFileSizeFromFile() + { + try + { + return parseFileSizeFile().get(fileId); + } + catch (NumberFormatException e) + { + e.printStackTrace(); + } + return -1; + } + + + private Map parseFileSizeFile() + { + Map fileSizes = new HashMap<>(); + try + { + List lines = Files.readAllLines(fileSizeFile); + lines.stream().forEach(line -> + { + String id = line.split(":")[0]; + long size = Long.parseLong(line.split(":")[1]); + fileSizes.put(id, size); + }); + } + catch (IOException e) + { + e.printStackTrace(); + } + return fileSizes; + } +} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/datastore/DataStoreSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/datastore/DataStoreSequentialFileFactory.java new file mode 100644 index 00000000000..05b295dba0d --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/datastore/DataStoreSequentialFileFactory.java @@ -0,0 +1,245 @@ +/* + * DataStoreSequentialFileFactory.java + * + * created at 2023-07-17 by f.gervasi f.gervasi@seeburger.de + * + * Copyright (c) SEEBURGER AG, Germany. All Rights Reserved. + */ +package org.apache.activemq.artemis.core.io.datastore; + + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; + +import com.seeburger.storage.common.util.StorageUtil; + +import io.netty.util.internal.PlatformDependent; + + +public class DataStoreSequentialFileFactory extends AbstractSequentialFileFactory +{ + private Path mapping; + private String instanceId; // default is own instance ID + private Map createdFiles = new HashMap<>(); + + protected DataStoreSequentialFileFactory(File journalDir, boolean buffered, int bufferSize, int bufferTimeout, int maxIO, + boolean logRates, IOCriticalErrorListener criticalErrorListener, + CriticalAnalyzer criticalAnalyzer) + { + super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener, criticalAnalyzer); + } + + + public DataStoreSequentialFileFactory(boolean logRates, IOCriticalErrorListener criticalErrorListener, + CriticalAnalyzer criticalAnalyzer) + { + this(null, false, 0, 0, 1, logRates, criticalErrorListener, criticalAnalyzer); +// mapping = Paths.get(System.getProperty("bisas.temp"), "datastore-mapping.txt"); +// createMappingFile(); + } + + + @Override + public SequentialFile createSequentialFile(String fileName) + { + // do not always create a new UUID! + // look in the mapping table for the UUID mapped to this fileName + String fileId = StorageUtil.createUUID(); +// if (!isMappingExistsForFileName(fileName)) +// { +// fileId = StorageUtil.createUUID(); +// putNewFile(fileId, fileName); +// } +// else +// { +// fileId = getCreatedFiles().entrySet().stream().filter(entry -> entry.getValue().equals(fileName)).findFirst().get().getKey(); +// } + createdFiles.put(fileId, fileName); + return new DataStoreSequentialFile(fileName, fileId, this); + } + + + @Override + public boolean isSupportsCallbacks() + { + return false; + } + + + @Override + public ByteBuffer allocateDirectBuffer(int size) + { + return ByteBuffer.allocateDirect(size); + } + + + @Override + public void releaseDirectBuffer(ByteBuffer buffer) + { + if (buffer.isDirect()) + { + PlatformDependent.freeDirectBuffer(buffer); + } + } + + + @Override + public ByteBuffer newBuffer(int size) + { + return ByteBuffer.allocate(size); + } + + + @Override + public ByteBuffer wrapBuffer(byte[] bytes) + { + return ByteBuffer.wrap(bytes); + } + + + @Override + public int calculateBlockSize(int bytes) + { + return bytes; + } + + + @Override + public void clearBuffer(ByteBuffer buffer) + { + if (buffer.isDirect()) + { + PlatformDependent.setMemory(PlatformDependent.directBufferAddress(buffer), buffer.limit(), (byte)0); + } + else + { + Arrays.fill(buffer.array(), buffer.arrayOffset(), buffer.limit(), (byte)0); + } + } + + + @Override + public List listFiles(String extension) + throws Exception + { + // retrieve the files from the DataStore which are found in the mapping table of the instance with the instanceId + // SELECT * FROM zuordung WHERE instance_id = this.instanceId AND file_name.endwith(extension) +// return getCreatedFiles().values().stream().filter(name -> name.endsWith(extension)).collect(Collectors.toList()); + return Collections.emptyList(); + } + + + public void deleteFileName(String fileId) + { +// try +// { +// List lines = Files.readAllLines(mapping).stream().filter(line -> !line.startsWith(fileId)).collect(Collectors.toList()); +// Files.delete(mapping); +// createMappingFile(); +// lines.stream().forEach(line -> +// { +// try +// { +// Files.writeString(mapping, String.format("%s%n", line), StandardOpenOption.APPEND); +// } +// catch (IOException e) +// { +// e.printStackTrace(); +// } +// }); +// } +// catch (IOException e) +// { +// e.printStackTrace(); +// } + createdFiles.remove(fileId); + } + + + public void changeFileName(String newName, String fileId) + { +// deleteFileName(fileId); +// putNewFile(fileId, newName); + createdFiles.put(fileId, newName); + } + + + public void setInstance(String instanceId) + { + this.instanceId = instanceId; + } + + + public Map getCreatedFiles() + { + Map createdFiles = new HashMap<>(); + try + { + List lines = Files.readAllLines(mapping); + lines.stream().forEach(line -> + { + String fileId = line.split(":")[0]; + String fileName = line.split(":")[1]; + createdFiles.put(fileId, fileName); + }); + } + catch (IOException e) + { + e.printStackTrace(); + } + return createdFiles; + } + + + private void putNewFile(String fileId, String fileName) + { + try + { + Files.writeString(mapping, String.format("%s:%s%n", fileId, fileName), StandardOpenOption.APPEND); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + + private void createMappingFile() + { + if (Files.notExists(mapping)) + { + try + { + Files.createFile(mapping); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + } + + + private boolean isMappingExistsForFileName(String fileName) + { + Map mappings = getCreatedFiles(); + return mappings.containsValue(fileName); + } +} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/vfs/VFSSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/vfs/VFSSequentialFile.java new file mode 100644 index 00000000000..5dfbdf4ef1c --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/vfs/VFSSequentialFile.java @@ -0,0 +1,532 @@ +/* + * VFSSequentialFile.java + * + * created at 2023-08-29 by f.gervasi f.gervasi@seeburger.de + * + * Copyright (c) SEEBURGER AG, Germany. All Rights Reserved. + */ +package org.apache.activemq.artemis.core.io.vfs; + + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.invoke.MethodHandles; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Paths; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.io.DummyCallback; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.seeburger.vfs.application.client.api.VFSClientService; +import com.seeburger.vfs.application.client.api.context.VFSContext; +import com.seeburger.vfs.application.client.api.model.VFSFile; +import com.seeburger.vfs.application.client.api.model.VFSFileOperationMode; +import com.seeburger.vfs.client.api.VFSException; +import com.seeburger.recover.impl.vfs.VFSClientServiceFactory; +import com.seeburger.seeauth.LoginSession; + + +public class VFSSequentialFile implements SequentialFile +{ + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private String fileName; + private String journalDir; + private VFSSequentialFileFactory factory; + private VFSClientService vfsClient; + private CountDownLatch vfsClientFinished = new CountDownLatch(1); + private ByteArrayOutputStream tempData = new ByteArrayOutputStream(); + private boolean isOpen = false; + private OutputStream fileOut; + private WritableByteChannel fileChannel; + private WritableByteChannel tempChannel; + + public VFSSequentialFile(String fileName, String journalDir, VFSSequentialFileFactory factory) + { + this.fileName = fileName; + this.journalDir = journalDir; + this.factory = factory; + tempChannel = Channels.newChannel(tempData); + new Thread(tryAndGetVFSClient).start(); + } + + private Runnable tryAndGetVFSClient = () -> + { + try + { + LoginSession.createSession(LoginSession.LS_ALL, null); + VFSContext.enterVFSContext("VFSJournal"); + this.vfsClient = VFSClientServiceFactory.getVFSClientService(); + VFSFile file = null; + while (file == null) + { + file = tryToSetFile(); + } + fileOut = file.getOutputStream(VFSFileOperationMode.APPEND); + fileChannel = Channels.newChannel(fileOut); + fileOut.write(tempData.toByteArray()); + tempChannel.close(); + vfsClientFinished.countDown(); + } + catch (IllegalStateException | VFSException | IOException e) + { + LOG.error("Waiting for VFS to become available took too long.", e); + } + }; + + private VFSFile tryToSetFile() + { + VFSFile file = null; + try + { + if (!vfsClient.existsFile(Paths.get(journalDir, fileName))) + { + file = vfsClient.createFile(Paths.get(journalDir, fileName)); + } + else + { + file = vfsClient.getFile(Paths.get(journalDir, fileName)); + } + } + catch (Exception e) + { + try + { + LOG.info("Will sleep 5 seconds."); + Thread.sleep(5000); + } + catch (InterruptedException e1) + { + Thread.currentThread().interrupt(); + } + } + return file; + } + + + @Override + public boolean isOpen() + { + return isOpen; + } + + + @Override + public boolean exists() + { + if (vfsClientFinished.getCount() == 0) + { + try + { + return vfsClient.existsFile(Paths.get(journalDir, fileName)); + } + catch (VFSException e) + { + LOG.error(e.getMessage()); + } + } + return false; + } + + + @Override + public void open() + throws Exception + { + open(1, false); + } + + + @Override + public void open(int maxIO, boolean useExecutor) + throws Exception + { + if (vfsClientFinished.getCount() == 0) + { + if (!exists()) + { + vfsClient.createFile(Paths.get(journalDir, fileName)); + } + fileOut = vfsClient.getFile(Paths.get(journalDir, fileName)).getOutputStream(VFSFileOperationMode.APPEND); + } + isOpen = true; + } + + + @Override + public ByteBuffer map(int position, long size) + throws IOException + { + return null; + } + + + @Override + public boolean fits(int size) + { + return true; + } + + + @Override + public int calculateBlockStart(int position) + throws Exception + { + return position; + } + + + @Override + public String getFileName() + { + return fileName; + } + + + @Override + public void fill(int size) + throws Exception + { + // Nothing to do here + } + + + @Override + public void delete() + throws IOException, InterruptedException, ActiveMQException + { + if (vfsClientFinished.getCount() == 0) + { + try + { + vfsClient.delete(Paths.get(journalDir, fileName), false); + } + catch (VFSException e) + { + LOG.error(e.getMessage()); + } + } + } + + + @Override + public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) + throws Exception + { + int readableBytes = bytes.readableBytes(); + ByteBuffer buffer = factory.newBuffer(readableBytes); + buffer.limit(readableBytes); + bytes.getBytes(bytes.readerIndex(), buffer); + buffer.flip(); + writeDirect(buffer, sync, callback); + } + + + @Override + public void write(ActiveMQBuffer bytes, boolean sync) + throws Exception + { + if (sync) + { + SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); + write(bytes, true, completion); + completion.waitCompletion(); + } + else + { + write(bytes, false, DummyCallback.getInstance()); + } + } + + + @Override + public void write(EncodingSupport bytes, boolean sync, IOCallback callback) + throws Exception + { + int encodedSize = bytes.getEncodeSize(); + ByteBuffer buffer = factory.newBuffer(encodedSize); + ActiveMQBuffer outBuffer = ActiveMQBuffers.wrappedBuffer(buffer); + bytes.encode(outBuffer); + buffer.clear(); + buffer.limit(encodedSize); + writeDirect(buffer, sync, callback); + } + + + @Override + public void write(EncodingSupport bytes, boolean sync) + throws Exception + { + if (sync) + { + SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); + write(bytes, true, completion); + completion.waitCompletion(); + } + else + { + write(bytes, false, DummyCallback.getInstance()); + } + } + + + @Override + public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) + { + internalWrite(bytes, sync, callback, true); + } + + + @Override + public void writeDirect(ByteBuffer bytes, boolean sync) + throws Exception + { + internalWrite(bytes, sync, null, true); + } + + + @Override + public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) + throws Exception + { + internalWrite(bytes, sync, null, releaseBuffer); + } + + + @Override + public int read(ByteBuffer bytes, IOCallback callback) + throws Exception + { + if (vfsClientFinished.getCount() != 0) + { + LOG.error("Cannot read file because VFS Client is not available yet."); + return -1; + } + VFSFile file = vfsClient.getFile(Paths.get(journalDir, fileName)); + InputStream fileIn = file.getInputStream(); + int bytesRead = 0; + if (bytes.hasArray()) + { + bytesRead = fileIn.read(bytes.array(), bytes.arrayOffset() + bytes.remaining(), bytes.limit()); + } + callback.done(); + return bytesRead; + } + + + @Override + public int read(ByteBuffer bytes) + throws Exception + { + return read(bytes, DummyCallback.getInstance()); + } + + + @Override + public void position(long pos) + throws IOException + { + // Not needed right now + } + + + @Override + public long position() + { + return 0; + } + + + @Override + public void close() + throws Exception + { + if (vfsClientFinished.getCount() == 0) + { + fileChannel.close(); + } + isOpen = false; + } + + + @Override + public void sync() + throws IOException + { + LoginSession.createSession(LoginSession.LS_ALL, null); + VFSContext.enterVFSContext("VFSJournal"); + fileChannel.close(); + try + { + fileOut = vfsClient.getFile(Paths.get(journalDir, fileName)).getOutputStream(VFSFileOperationMode.APPEND); + fileChannel = Channels.newChannel(fileOut); + } + catch (VFSException e) + { + LOG.error("Could not get OutputStream for file!", e); + } + } + + + @Override + public long size() + throws Exception + { + if (vfsClientFinished.getCount() == 0) + { + return vfsClient.getFile(Paths.get(journalDir, fileName)).getSize(); + } + return 0; + } + + + @Override + public void renameTo(String newFileName) + throws Exception + { + if (vfsClientFinished.getCount() == 0) + { + VFSFile oldFile = vfsClient.getFile(Paths.get(journalDir, fileName)); + VFSFile newFile = vfsClient.createFile(Paths.get(journalDir, newFileName)); + OutputStream newFileOut = newFile.getOutputStream(); + InputStream oldFileIn = oldFile.getInputStream(); + byte[] allBytes = oldFileIn.readAllBytes(); + newFileOut.write(allBytes); + newFileOut.close(); + oldFileIn.close(); + vfsClient.delete(Paths.get(journalDir, fileName), false); + fileName = newFileName; + } + } + + + @Override + public SequentialFile cloneFile() + { + return new VFSSequentialFile(fileName, journalDir, factory); + } + + + @Override + public void copyTo(SequentialFile newFileName) + throws Exception + { + if (vfsClientFinished.getCount() == 0) + { + vfsClient.move(Paths.get(journalDir, fileName), Paths.get(journalDir, newFileName.getFileName())); + } + } + + + @Override + public void setTimedBuffer(TimedBuffer buffer) + { + // Do nothing + } + + + @Override + public File getJavaFile() + { + return null; + } + + + private void internalWrite(ByteBuffer bytes, boolean sync, IOCallback callback, boolean releaseBuffer) + { + try + { + if (vfsClientFinished.getCount() == 0) + { + internalWriteWithFileStreamData(bytes); + } + else + { + internalWriteWithTempStreamData(bytes); + } + + if (sync && vfsClientFinished.getCount() == 0) + { + sync(); + } + + if (callback != null) + { + callback.done(); + } + } + catch (IOException e) + { + LOG.error(e.getMessage()); + } + finally + { + if (releaseBuffer) + { + factory.releaseBuffer(bytes); + } + } + } + + + private void internalWriteWithTempStreamData(ByteBuffer buffer) + throws IOException + { + if (buffer.hasArray()) + { + tempData.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + } + else + { + tempChannel.write(buffer); + } + } + + + private void internalWriteWithFileStreamData(ByteBuffer buffer) + throws IOException + { + if (buffer.hasArray()) + { + fileOut.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + } + else + { + fileChannel.write(buffer); + } + } + + + private T measureTime(Callable call, String name) + { + try + { + long start = System.currentTimeMillis(); + T value = call.call(); + long delta = System.currentTimeMillis() - start; + LOG.info("Call to {} took {} ms.", name, delta); + return value; + } + catch (Exception e) + { + e.printStackTrace(); + } + return null; + } + +} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/vfs/VFSSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/vfs/VFSSequentialFileFactory.java new file mode 100644 index 00000000000..bc6b273f890 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/vfs/VFSSequentialFileFactory.java @@ -0,0 +1,120 @@ +/* + * VFSSequentialFileFactory.java + * + * created at 2023-08-29 by f.gervasi f.gervasi@seeburger.de + * + * Copyright (c) SEEBURGER AG, Germany. All Rights Reserved. + */ +package org.apache.activemq.artemis.core.io.vfs; + + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.utils.Env; +import org.apache.activemq.artemis.utils.PowerOf2Util; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; + +import io.netty.util.internal.PlatformDependent; + + +public class VFSSequentialFileFactory extends AbstractSequentialFileFactory +{ + + public VFSSequentialFileFactory(File journalDir, boolean buffered, int bufferSize, int bufferTimeout, int maxIO, boolean logRates, + IOCriticalErrorListener criticalErrorListener, CriticalAnalyzer criticalAnalyzer) + { + super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener, criticalAnalyzer); + } + + + @Override + public SequentialFile createSequentialFile(String fileName) + { + return new VFSSequentialFile(fileName, journalDir.getPath(), this); + } + + + @Override + public boolean isSupportsCallbacks() + { + return false; + } + + + @Override + public ByteBuffer allocateDirectBuffer(int size) + { + int requiredCapacity = PowerOf2Util.align(size, Env.osPageSize()); + ByteBuffer buffer = ByteBuffer.allocateDirect(requiredCapacity); + buffer.limit(size); + return buffer; + } + + + @Override + public void releaseDirectBuffer(ByteBuffer buffer) + { + if (buffer.isDirect()) + { + PlatformDependent.freeDirectBuffer(buffer); + } + } + + + @Override + public ByteBuffer newBuffer(int size) + { + return newBuffer(size, true); + } + + + @Override + public ByteBuffer newBuffer(int size, boolean zeroed) + { + return allocateDirectBuffer(size); + } + + + @Override + public ByteBuffer wrapBuffer(byte[] bytes) + { + return ByteBuffer.wrap(bytes); + } + + + @Override + public int calculateBlockSize(int bytes) + { + return bytes; + } + + + @Override + public void clearBuffer(ByteBuffer buffer) + { + if (buffer.isDirect()) + { + PlatformDependent.setMemory(PlatformDependent.directBufferAddress(buffer), buffer.limit(), (byte)0); + } + else + { + Arrays.fill(buffer.array(), buffer.arrayOffset(), buffer.limit(), (byte)0); + } + } + + + @Override + public List listFiles(String extension) + throws Exception + { + return Collections.emptyList(); + } + +} diff --git a/artemis-server-osgi/pom.xml b/artemis-server-osgi/pom.xml index 8ee7f3a8b9f..9a63517e6a6 100644 --- a/artemis-server-osgi/pom.xml +++ b/artemis-server-osgi/pom.xml @@ -1,13 +1,13 @@ + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> 4.0.0 diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 336a1820df0..b0df600bf3b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -43,8 +43,10 @@ import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.datastore.DataStoreSequentialFileFactory; import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.vfs.VFSSequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.Journal; @@ -164,6 +166,12 @@ protected void init(Configuration config, IOCriticalErrorListener criticalErrorL } journalFF = new MappedSequentialFileFactory(config.getJournalLocation(), config.getJournalFileSize(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), criticalErrorListener); break; + case DATA_STORE: + journalFF = new DataStoreSequentialFileFactory(config.isLogJournalWriteRate(), criticalErrorListener, getCriticalAnalyzer()); + break; + case VFS: + journalFF = new VFSSequentialFileFactory(config.getJournalLocation(), false, 0, 0, 1, false, criticalErrorListener, getCriticalAnalyzer()); + break; default: throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType()); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java index df60e9beef8..74f63735b3b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java @@ -18,7 +18,7 @@ public enum JournalType { - NIO, ASYNCIO, MAPPED; + NIO, ASYNCIO, MAPPED, DATA_STORE, VFS; public static final String validValues; @@ -41,6 +41,8 @@ public static JournalType getType(String type) { case "NIO": return NIO; case "ASYNCIO" : return ASYNCIO; case "MAPPED" : return MAPPED; + case "DATA_STORE": return DATA_STORE; + case "VFS": return VFS; default: throw new IllegalStateException("Invalid JournalType:" + type + " valid Types: " + validValues); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/seeburger/healthcheck/ClusterHealthcheck.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/seeburger/healthcheck/ClusterHealthcheck.java new file mode 100644 index 00000000000..d162c01209c --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/seeburger/healthcheck/ClusterHealthcheck.java @@ -0,0 +1,91 @@ +/* + * ClusterHealthcheck.java + * + * created at 2023-07-29 by f.gervasi + * + * Copyright (c) SEEBURGER AG, Germany. All Rights Reserved. + */ +package org.apache.activemq.artemis.core.server.seeburger.healthcheck; + + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.io.datastore.DataStoreSequentialFileFactory; +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; +import org.apache.activemq.artemis.core.persistence.GroupingInfo; +import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.persistence.impl.PageCountPending; +import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.JournalLoader; +import org.apache.activemq.artemis.core.transaction.ResourceManager; + + +public class ClusterHealthcheck implements Runnable +{ + private ActiveMQServer server; + private StorageManager storageManager; + private JournalLoader loader; + private PostOffice postOffice; + private PagingManager pagingManager; + private ResourceManager resourceManager; + + public ClusterHealthcheck(ActiveMQServer server, StorageManager storageManager, JournalLoader loader, PostOffice postOffice, + PagingManager pagingManager, ResourceManager resourceManager) + { + this.server = server; + this.storageManager = storageManager; + this.loader = loader; + this.postOffice = postOffice; + this.pagingManager = pagingManager; + this.resourceManager = resourceManager; + } + + + @Override + public void run() // start this inside initialisePart2 after own journal is loaded + { + while (server.isStarted()) + { + // start discovery of process engines and their instance IDs + // try to get locks on resources with those IDs and of type "messaging" + // if a lock is acquired/an outage is detected + // read file IDs from this instance from database + // use StorageManager and JournalLoader to load foreign journals into own PostOffice + try + { + List queueBindings = new ArrayList<>(); + List addressBindings = new ArrayList<>(); + List groupings = new ArrayList<>(); + Map queueBindingsMap = new HashMap<>(); + + DataStoreSequentialFileFactory factory = (DataStoreSequentialFileFactory)storageManager.getJournalSequentialFileFactory(); + factory.setInstance("instanceId"); // the instance which failed + storageManager.loadBindingJournal(queueBindings, groupings, addressBindings); + loader.initAddresses(addressBindings); + loader.initQueues(queueBindingsMap, queueBindings); + + Map>> duplicateIDMap = new HashMap<>(); + HashSet> pendingLargeMessages = new HashSet<>(); + List pendingNonTXPageCounter = new LinkedList<>(); + Set largeMessagesInFolder = new HashSet<>(); + storageManager.loadMessageJournal(postOffice, pagingManager, resourceManager, queueBindingsMap, duplicateIDMap, + pendingLargeMessages, largeMessagesInFolder, pendingNonTXPageCounter, loader); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } +} diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 5b23f0ffff0..558ba9215b3 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -656,6 +656,8 @@ + +