diff --git a/jar/pb-core-6.4.0.jar b/jar/pb-core-6.4.0.jar new file mode 100644 index 0000000..49d6d3f Binary files /dev/null and b/jar/pb-core-6.4.0.jar differ diff --git a/src/bzh/plealog/dbmirror/fetcher/PAsperaLoader.java b/src/bzh/plealog/dbmirror/fetcher/PAsperaLoader.java index ff81596..1c56668 100644 --- a/src/bzh/plealog/dbmirror/fetcher/PAsperaLoader.java +++ b/src/bzh/plealog/dbmirror/fetcher/PAsperaLoader.java @@ -23,7 +23,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.net.ftp.FTPClient; import bzh.plealog.dbmirror.util.aspera.AsperaCmd; import bzh.plealog.dbmirror.util.aspera.AsperaUtils; @@ -62,12 +61,17 @@ public String getWorkerBaseName() { } @Override - public void closeConnection(FTPClient ftp) { + public void closeLoader() { //nothing to do } @Override - protected boolean configureFtpClient(FTPClient ftp, DBServerConfig fsc) { + public boolean readyToDownload() { + return true; + } + + @Override + public boolean prepareLoader(DBServerConfig fsc) { boolean bRet=true; //check we have Aspera configuration //Aspera bin and ssh certificate are located in global config file @@ -100,7 +104,7 @@ protected boolean configureFtpClient(FTPClient ftp, DBServerConfig fsc) { } @Override - protected int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile, File file, long lclFSize) { + protected int downloadFile(DBServerConfig fsc, DBMSFtpFile rFile, File file, long lclFSize) { int iRet = 0; //no apsera cmd object: may happen if wrong configuration diff --git a/src/bzh/plealog/dbmirror/fetcher/PFTPLoader.java b/src/bzh/plealog/dbmirror/fetcher/PFTPLoader.java index c486e34..6676e2a 100755 --- a/src/bzh/plealog/dbmirror/fetcher/PFTPLoader.java +++ b/src/bzh/plealog/dbmirror/fetcher/PFTPLoader.java @@ -57,6 +57,7 @@ public class PFTPLoader { private String _loaderId; private int _timeout = 50000; private String _fileOfFiles; + private FTPClient _ftp = null; protected static final String CANCEL_MSG = "job cancelled"; protected static final String CONN_ERR_MSG = "Server does not answer. Retry..."; @@ -110,27 +111,33 @@ public void setUserProcessingMonitor(UserProcessingMonitor userMonitor) { public void setFileOfFiles(String fof) { _fileOfFiles = fof; } - public void closeConnection(FTPClient ftp) { + public void closeLoader() { // Logout from the FTP Server and disconnect // for unknown reasons, sometimes got a Connection Reset SocketException. // do nothing in that case try { - ftp.logout(); + _ftp.logout(); } catch (Exception e) { } } - public FTPClient openConnection(DBServerConfig fsc) { - FTPClient ftp; + public boolean readyToDownload() { + return (_ftp != null && _ftp.isConnected()); + } + + public boolean prepareLoader(DBServerConfig fsc) { + FTPClient ftp = null; ftp = new FTPClient(); if(!configureFtpClient(ftp, fsc)) { - ftp = null; + return false; } - return ftp; + + _ftp = ftp; + return true; } - protected boolean configureFtpClient(FTPClient ftp, DBServerConfig fsc) { + private boolean configureFtpClient(FTPClient ftp, DBServerConfig fsc) { int reply; boolean bRet = true; try { @@ -178,7 +185,6 @@ protected boolean configureFtpClient(FTPClient ftp, DBServerConfig fsc) { /** * Download a file from remote server. * - * @param ftp the ftp client * @param fsc bank descriptor * @param rFile remote file descriptor * @param file local file path @@ -186,7 +192,7 @@ protected boolean configureFtpClient(FTPClient ftp, DBServerConfig fsc) { * @return 1 if success, 0 if failure, 2 if skip (file already loaded ; when * resuming from a previous work) and 3 if aborted. * */ - protected int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile, File file, long lclFSize) { + protected int downloadFile(DBServerConfig fsc, DBMSFtpFile rFile, File file, long lclFSize) { FileOutputStream fos = null; InputStream ftpIS = null; String remoteFName; @@ -199,28 +205,28 @@ protected int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile, remoteFSize = rFile.getFtpFile().getSize(); try { // enter remote directory - if (ftp.changeWorkingDirectory(rFile.getRemoteDir())) { + if (_ftp.changeWorkingDirectory(rFile.getRemoteDir())) { // download file LoggerCentral.info(LOGGER, " " + getLoaderId() + ": download: " + rFile.getRemoteDir() + remoteFName); if (lclFSize!=0) { fos = new FileOutputStream(file, true); - ftp.setRestartOffset(lclFSize); + _ftp.setRestartOffset(lclFSize); } else { fos = new FileOutputStream(file); - ftp.setRestartOffset(0l); + _ftp.setRestartOffset(0l); } - ftpIS = ftp.retrieveFileStream(remoteFName); + ftpIS = _ftp.retrieveFileStream(remoteFName); if (ftpIS == null) { - throw new Exception(getLoaderId() + ": unable to open remote input stream: " + ftp.getReplyString()); + throw new Exception(getLoaderId() + ": unable to open remote input stream: " + _ftp.getReplyString()); } Util.copyStream(ftpIS, fos, Util.DEFAULT_COPY_BUFFER_SIZE, remoteFSize, new MyCopyStreamListener(getLoaderId(), _userMonitor, fsc.getName(), remoteFName, remoteFSize, lclFSize)); IOUtils.closeQuietly(ftpIS); fos.flush(); IOUtils.closeQuietly(fos); - if (ftp.completePendingCommand()) { + if (_ftp.completePendingCommand()) { file.setLastModified(remoteFDate.getTime()); } else { throw new Exception(getLoaderId() + ": unable to download full file."); @@ -262,7 +268,7 @@ protected int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile, * @return 1 if success, 0 if failure, 2 if skip (file already loaded ; when * resuming from a previous work) and 3 if aborted. */ - public int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile, + public int downloadFile(DBServerConfig fsc, DBMSFtpFile rFile, int fileNum, int totFiles) { File file, filegz, tmpDir; String remoteFName, name, msg; @@ -326,7 +332,7 @@ public int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile, UserProcessingMonitor.MSG_TYPE.OK, msg); } - iRet = downloadFile(ftp, fsc, rFile, file, lclFSize header_attrs=null; + + //prepare the server side file to retrieve + String fileToRetrive = url_base + "/" + rFile.getRemoteDir()+rFile.getFtpFile().getName(); + + LoggerCentral.info(LOGGER, " " + getLoaderId() + ": download: " + fileToRetrive); + + //resume downloading if needed + if (lclFSize!=0) { + LoggerCentral.info(LOGGER, "resume downloading at byte: "+lclFSize); + header_attrs = new HashMap<>(); + header_attrs.put( + HTTPBasicEngine.RANGE_HTTP, + String.format(HTTPBasicEngine.RANGE_HTTP_FORMAT, lclFSize)); + } + //go! + try { + HTTPBasicEngine.doGet(fileToRetrive, header_attrs, file, + new MyCopyStreamListener(getLoaderId(), _userMonitor, + fsc.getName(), rFile.getFtpFile().getName(), rFile.getFtpFile().getSize(), 0)); + iRet = 1; + } catch (HTTPEngineException e1) { + //do not raise warn or error here, handled by LoaderEngine + LoggerCentral.info(LOGGER, e1.getMessage() + " (" + e1.getHttpCode() + ")"); + } + if (_userMonitor.jobCancelled()) { + iRet=3; + } + return iRet; + } +} diff --git a/src/bzh/plealog/dbmirror/ui/RunningMirrorPanel.java b/src/bzh/plealog/dbmirror/ui/RunningMirrorPanel.java index 06bcfaa..e4cdbc5 100755 --- a/src/bzh/plealog/dbmirror/ui/RunningMirrorPanel.java +++ b/src/bzh/plealog/dbmirror/ui/RunningMirrorPanel.java @@ -53,6 +53,7 @@ import bzh.plealog.dbmirror.fetcher.PFTPLoader; import bzh.plealog.dbmirror.fetcher.PFTPLoaderDescriptor; import bzh.plealog.dbmirror.fetcher.PFTPLoaderSystem; +import bzh.plealog.dbmirror.fetcher.PHTTPLoader; import bzh.plealog.dbmirror.fetcher.PLocalLoader; import bzh.plealog.dbmirror.fetcher.UserProcessingMonitor; import bzh.plealog.dbmirror.indexer.LuceneUtils; @@ -494,7 +495,8 @@ private synchronized JLabel getFtpLabel(String workerId) { workerLabels.put(workerId, _ftpLblMsg[0]); lbl = _ftpLblMsg[0]; } else if (workerId.startsWith(PFTPLoader.FTP_WORKER) || - workerId.startsWith(PAsperaLoader.ASPC_WORKER)) { + workerId.startsWith(PAsperaLoader.ASPC_WORKER) || + workerId.startsWith(PHTTPLoader.HTTP_WORKER)) { int pos = workerId.indexOf('-'); String value = workerId.substring(pos + 1); int idx = Integer.valueOf(value); @@ -517,7 +519,8 @@ public synchronized JProgressBar getFtpProgress(String workerId) { workerProgresess.put(workerId, _ftpProgress[0]); lbl = _ftpProgress[0]; } else if (workerId.startsWith(PFTPLoader.FTP_WORKER) || - workerId.startsWith(PAsperaLoader.ASPC_WORKER)) { + workerId.startsWith(PAsperaLoader.ASPC_WORKER) || + workerId.startsWith(PHTTPLoader.HTTP_WORKER)) { int idx = Integer .valueOf(workerId.substring(workerId.indexOf('-') + 1)); lbl = _ftpProgress[idx >= DBMSAbstractConfig.getFileCopyWorkers() ? 0