Skip to content

Commit

Permalink
enable HTTP/HTTPS protocol as an alternative if FTP is not working
Browse files Browse the repository at this point in the history
  • Loading branch information
pgdurand committed Mar 29, 2021
1 parent 6df1693 commit f4c2032
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 39 deletions.
Binary file added jar/pb-core-6.4.0.jar
Binary file not shown.
12 changes: 8 additions & 4 deletions src/bzh/plealog/dbmirror/fetcher/PAsperaLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.net.ftp.FTPClient;

import bzh.plealog.dbmirror.util.aspera.AsperaCmd;
import bzh.plealog.dbmirror.util.aspera.AsperaUtils;
Expand Down Expand Up @@ -62,12 +61,17 @@ public String getWorkerBaseName() {
}

@Override
public void closeConnection(FTPClient ftp) {
public void closeLoader() {
//nothing to do
}

@Override
protected boolean configureFtpClient(FTPClient ftp, DBServerConfig fsc) {
public boolean readyToDownload() {
return true;
}

@Override
public boolean prepareLoader(DBServerConfig fsc) {
boolean bRet=true;
//check we have Aspera configuration
//Aspera bin and ssh certificate are located in global config file
Expand Down Expand Up @@ -100,7 +104,7 @@ protected boolean configureFtpClient(FTPClient ftp, DBServerConfig fsc) {
}

@Override
protected int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile, File file, long lclFSize) {
protected int downloadFile(DBServerConfig fsc, DBMSFtpFile rFile, File file, long lclFSize) {
int iRet = 0;

//no apsera cmd object: may happen if wrong configuration
Expand Down
40 changes: 23 additions & 17 deletions src/bzh/plealog/dbmirror/fetcher/PFTPLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class PFTPLoader {
private String _loaderId;
private int _timeout = 50000;
private String _fileOfFiles;
private FTPClient _ftp = null;

protected static final String CANCEL_MSG = "job cancelled";
protected static final String CONN_ERR_MSG = "Server does not answer. Retry...";
Expand Down Expand Up @@ -110,27 +111,33 @@ public void setUserProcessingMonitor(UserProcessingMonitor userMonitor) {
public void setFileOfFiles(String fof) {
_fileOfFiles = fof;
}
public void closeConnection(FTPClient ftp) {
public void closeLoader() {
// Logout from the FTP Server and disconnect
// for unknown reasons, sometimes got a Connection Reset SocketException.
// do nothing in that case
try {
ftp.logout();
_ftp.logout();
} catch (Exception e) {
}
}

public FTPClient openConnection(DBServerConfig fsc) {
FTPClient ftp;
public boolean readyToDownload() {
return (_ftp != null && _ftp.isConnected());
}

public boolean prepareLoader(DBServerConfig fsc) {
FTPClient ftp = null;

ftp = new FTPClient();
if(!configureFtpClient(ftp, fsc)) {
ftp = null;
return false;
}
return ftp;

_ftp = ftp;
return true;
}

protected boolean configureFtpClient(FTPClient ftp, DBServerConfig fsc) {
private boolean configureFtpClient(FTPClient ftp, DBServerConfig fsc) {
int reply;
boolean bRet = true;
try {
Expand Down Expand Up @@ -178,15 +185,14 @@ protected boolean configureFtpClient(FTPClient ftp, DBServerConfig fsc) {
/**
* Download a file from remote server.
*
* @param ftp the ftp client
* @param fsc bank descriptor
* @param rFile remote file descriptor
* @param file local file path
*
* @return 1 if success, 0 if failure, 2 if skip (file already loaded ; when
* resuming from a previous work) and 3 if aborted.
* */
protected int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile, File file, long lclFSize) {
protected int downloadFile(DBServerConfig fsc, DBMSFtpFile rFile, File file, long lclFSize) {
FileOutputStream fos = null;
InputStream ftpIS = null;
String remoteFName;
Expand All @@ -199,28 +205,28 @@ protected int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile,
remoteFSize = rFile.getFtpFile().getSize();
try {
// enter remote directory
if (ftp.changeWorkingDirectory(rFile.getRemoteDir())) {
if (_ftp.changeWorkingDirectory(rFile.getRemoteDir())) {
// download file
LoggerCentral.info(LOGGER, " " + getLoaderId() + ": download: " + rFile.getRemoteDir() + remoteFName);

if (lclFSize!=0) {
fos = new FileOutputStream(file, true);
ftp.setRestartOffset(lclFSize);
_ftp.setRestartOffset(lclFSize);
}
else {
fos = new FileOutputStream(file);
ftp.setRestartOffset(0l);
_ftp.setRestartOffset(0l);
}
ftpIS = ftp.retrieveFileStream(remoteFName);
ftpIS = _ftp.retrieveFileStream(remoteFName);
if (ftpIS == null) {
throw new Exception(getLoaderId() + ": unable to open remote input stream: " + ftp.getReplyString());
throw new Exception(getLoaderId() + ": unable to open remote input stream: " + _ftp.getReplyString());
}
Util.copyStream(ftpIS, fos, Util.DEFAULT_COPY_BUFFER_SIZE, remoteFSize,
new MyCopyStreamListener(getLoaderId(), _userMonitor, fsc.getName(), remoteFName, remoteFSize, lclFSize));
IOUtils.closeQuietly(ftpIS);
fos.flush();
IOUtils.closeQuietly(fos);
if (ftp.completePendingCommand()) {
if (_ftp.completePendingCommand()) {
file.setLastModified(remoteFDate.getTime());
} else {
throw new Exception(getLoaderId() + ": unable to download full file.");
Expand Down Expand Up @@ -262,7 +268,7 @@ protected int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile,
* @return 1 if success, 0 if failure, 2 if skip (file already loaded ; when
* resuming from a previous work) and 3 if aborted.
*/
public int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile,
public int downloadFile(DBServerConfig fsc, DBMSFtpFile rFile,
int fileNum, int totFiles) {
File file, filegz, tmpDir;
String remoteFName, name, msg;
Expand Down Expand Up @@ -326,7 +332,7 @@ public int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile,
UserProcessingMonitor.MSG_TYPE.OK,
msg);
}
iRet = downloadFile(ftp, fsc, rFile, file, lclFSize<remoteFSize?lclFSize:0);
iRet = downloadFile(fsc, rFile, file, lclFSize<remoteFSize?lclFSize:0);
if (_userMonitor != null) {
_userMonitor.processingMessage(getLoaderId(), fsc.getName(),
UserProcessingMonitor.PROCESS_TYPE.FTP_LOADING,
Expand Down
22 changes: 6 additions & 16 deletions src/bzh/plealog/dbmirror/fetcher/PFTPLoaderEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.List;

import org.apache.commons.logging.LogFactory;
import org.apache.commons.net.ftp.FTPClient;

import bzh.plealog.dbmirror.util.aspera.AsperaUtils;
import bzh.plealog.dbmirror.util.conf.DBMSAbstractConfig;
Expand Down Expand Up @@ -104,31 +103,24 @@ private DataShuttle nextFtpFile() {
}

private class LoadWorker extends Thread {
private FTPClient _ftp = null;
private PFTPLoader _loader;
private int _id;

private LoadWorker(int id) {
_id = id;
}

private FTPClient getFtpClient() {
if (_ftp != null && _ftp.isConnected()) {
return _ftp;
}
_ftp = _loader.openConnection(get_dbsc());
return _ftp;
}

public void run() {
DataShuttle file;
FTPClient ftp;
String fName;
int retry, nFiles, bRet=0;

if (AsperaUtils.asperaAvailable() && get_dbsc().useAspera()) {
_loader = new PAsperaLoader(_id);
}
else if (get_dbsc().getFTPAternativeProtocol()!=null) {
_loader = new PHTTPLoader(_id);
}
else {
_loader = new PFTPLoader(_id);
}
Expand All @@ -141,10 +133,8 @@ public void run() {
_monitor.beginLoading(fName);
// try to get a ftp connection
retry = 0;
ftp = null;
while (retry < _retry) {
ftp = getFtpClient();
if (ftp != null)
if (_loader.prepareLoader(get_dbsc()))
break;
retry++;
LoggerCentral.info(
Expand All @@ -156,11 +146,11 @@ public void run() {
} catch (InterruptedException e) {
}
}
if (ftp != null) {
if (_loader.readyToDownload()) {
// start loading
retry = 0;
while (retry < _retry) {
bRet = _loader.downloadFile(ftp, get_dbsc(), file.getFile(),
bRet = _loader.downloadFile(get_dbsc(), file.getFile(),
file.getFileNum(), nFiles);
if (bRet != 0)
break;
Expand Down
106 changes: 106 additions & 0 deletions src/bzh/plealog/dbmirror/fetcher/PHTTPLoader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/* Copyright (C) 2021 Patrick G. Durand
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* You may obtain a copy of the License at
*
* https://www.gnu.org/licenses/agpl-3.0.txt
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*/
package bzh.plealog.dbmirror.fetcher;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import bzh.plealog.bioinfo.io.http.HTTPBasicEngine;
import bzh.plealog.bioinfo.io.http.HTTPEngineException;
import bzh.plealog.dbmirror.util.conf.DBMSAbstractConfig;
import bzh.plealog.dbmirror.util.log.LoggerCentral;

/**
* This class is a file loader using HTTP or HTTPS protocol.
*
* @author Patrick G. Durand
*/
public class PHTTPLoader extends PFTPLoader {

private String url_base;

public static final String HTTP_WORKER = "HTTPLoader";

private static final Log LOGGER = LogFactory
.getLog(DBMSAbstractConfig.KDMS_ROOTLOG_CATEGORY
+ ".PHTTPLoader");

public PHTTPLoader(int id) {
super(id);
}

@Override
public String getWorkerBaseName() {
return HTTP_WORKER;
}

@Override
public void closeLoader() {
//nothing to do
}

@Override
public boolean readyToDownload() {
return true;
}

@Override
public boolean prepareLoader(DBServerConfig fsc) {
url_base = fsc.getFTPAternativeProtocol();
url_base+="://";
url_base+=fsc.getAddress();
return HTTPBasicEngine.isServerAvailable(url_base);
}

@Override
protected int downloadFile(DBServerConfig fsc, DBMSFtpFile rFile, File file, long lclFSize) {
int iRet = 0;
Map<String, String> header_attrs=null;

//prepare the server side file to retrieve
String fileToRetrive = url_base + "/" + rFile.getRemoteDir()+rFile.getFtpFile().getName();

LoggerCentral.info(LOGGER, " " + getLoaderId() + ": download: " + fileToRetrive);

//resume downloading if needed
if (lclFSize!=0) {
LoggerCentral.info(LOGGER, "resume downloading at byte: "+lclFSize);
header_attrs = new HashMap<>();
header_attrs.put(
HTTPBasicEngine.RANGE_HTTP,
String.format(HTTPBasicEngine.RANGE_HTTP_FORMAT, lclFSize));
}
//go!
try {
HTTPBasicEngine.doGet(fileToRetrive, header_attrs, file,
new MyCopyStreamListener(getLoaderId(), _userMonitor,
fsc.getName(), rFile.getFtpFile().getName(), rFile.getFtpFile().getSize(), 0));
iRet = 1;
} catch (HTTPEngineException e1) {
//do not raise warn or error here, handled by LoaderEngine
LoggerCentral.info(LOGGER, e1.getMessage() + " (" + e1.getHttpCode() + ")");
}
if (_userMonitor.jobCancelled()) {
iRet=3;
}
return iRet;
}
}
7 changes: 5 additions & 2 deletions src/bzh/plealog/dbmirror/ui/RunningMirrorPanel.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import bzh.plealog.dbmirror.fetcher.PFTPLoader;
import bzh.plealog.dbmirror.fetcher.PFTPLoaderDescriptor;
import bzh.plealog.dbmirror.fetcher.PFTPLoaderSystem;
import bzh.plealog.dbmirror.fetcher.PHTTPLoader;
import bzh.plealog.dbmirror.fetcher.PLocalLoader;
import bzh.plealog.dbmirror.fetcher.UserProcessingMonitor;
import bzh.plealog.dbmirror.indexer.LuceneUtils;
Expand Down Expand Up @@ -494,7 +495,8 @@ private synchronized JLabel getFtpLabel(String workerId) {
workerLabels.put(workerId, _ftpLblMsg[0]);
lbl = _ftpLblMsg[0];
} else if (workerId.startsWith(PFTPLoader.FTP_WORKER) ||
workerId.startsWith(PAsperaLoader.ASPC_WORKER)) {
workerId.startsWith(PAsperaLoader.ASPC_WORKER) ||
workerId.startsWith(PHTTPLoader.HTTP_WORKER)) {
int pos = workerId.indexOf('-');
String value = workerId.substring(pos + 1);
int idx = Integer.valueOf(value);
Expand All @@ -517,7 +519,8 @@ public synchronized JProgressBar getFtpProgress(String workerId) {
workerProgresess.put(workerId, _ftpProgress[0]);
lbl = _ftpProgress[0];
} else if (workerId.startsWith(PFTPLoader.FTP_WORKER) ||
workerId.startsWith(PAsperaLoader.ASPC_WORKER)) {
workerId.startsWith(PAsperaLoader.ASPC_WORKER) ||
workerId.startsWith(PHTTPLoader.HTTP_WORKER)) {
int idx = Integer
.valueOf(workerId.substring(workerId.indexOf('-') + 1));
lbl = _ftpProgress[idx >= DBMSAbstractConfig.getFileCopyWorkers() ? 0
Expand Down

0 comments on commit f4c2032

Please sign in to comment.