From b1e764a1638aeb382ff89726bee203e1fe6311d5 Mon Sep 17 00:00:00 2001 From: Greg Watson Date: Wed, 20 Jan 2016 17:28:24 -0500 Subject: [PATCH] Bug 479433 - Add better support for multiple input streams. Change-Id: I6234928c5ef6129cda3b77fd7e6467c0e817d7b4 Signed-off-by: Greg Watson --- .../internal/jsch/core/JSchConnection.java | 52 ++++--- .../core/commands/AbstractRemoteCommand.java | 4 +- .../core/commands/GetInputStreamCommand.java | 133 ++++++++++++++++-- .../remote/jsch/tests/FileStoreTests.java | 67 ++++++++- 4 files changed, 225 insertions(+), 31 deletions(-) diff --git a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/JSchConnection.java b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/JSchConnection.java index c9e2dc11e21..3ea70d3391e 100644 --- a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/JSchConnection.java +++ b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/JSchConnection.java @@ -257,7 +257,7 @@ public class JSchConnection implements IRemoteConnectionControlService, IRemoteC private final Map fProperties = new HashMap(); private final List fSessions = new ArrayList(); - private ChannelSftp fSftpChannel; + private ChannelSftp fSftpCommandChannel; private boolean isFullySetup; // including sftp channel and environment private static final Map connectionMap = new HashMap<>(); @@ -363,11 +363,11 @@ public class JSchConnection implements IRemoteConnectionControlService, IRemoteC } private synchronized void cleanup() { - if (fSftpChannel != null) { - if (fSftpChannel.isConnected()) { - fSftpChannel.disconnect(); + if (fSftpCommandChannel != null) { + if (fSftpCommandChannel.isConnected()) { + fSftpCommandChannel.disconnect(); } - fSftpChannel = null; + fSftpCommandChannel = null; } for (Session session : fSessions) { if (session.isConnected()) { @@ -677,24 +677,40 @@ public class JSchConnection implements IRemoteConnectionControlService, IRemoteC } /** - * Open an sftp channel to the remote host. Always use the second session if available. + * Open an sftp command channel to the remote host. This channel is for commands that do not require any + * state being preserved and should not be closed. Long running commands (such as get/put) should use a separate channel + * obtained via {#link #newSftpChannel()}. + * + * Always use the second session if available. * - * @return sftp channel or null if the progress monitor was cancelled + * @return sftp channel * @throws RemoteConnectionException * if a channel could not be opened */ - public ChannelSftp getSftpChannel() throws RemoteConnectionException { - if (fSftpChannel == null || fSftpChannel.isClosed()) { - Session session = fSessions.get(0); - if (fSessions.size() > 1) { - session = fSessions.get(1); - } - fSftpChannel = openSftpChannel(session); - if (fSftpChannel == null) { - throw new RemoteConnectionException(Messages.JSchConnection_Unable_to_open_sftp_channel); - } + public ChannelSftp getSftpCommandChannel() throws RemoteConnectionException { + if (fSftpCommandChannel == null || fSftpCommandChannel.isClosed()) { + fSftpCommandChannel = newSftpChannel(); } - return fSftpChannel; + return fSftpCommandChannel; + } + + /** + * Open a channel for long running commands. This channel should be closed when the command is completed. + * + * @return sftp channel + * @throws RemoteConnectionException + * if a channel could not be opened + */ + public ChannelSftp newSftpChannel() throws RemoteConnectionException { + Session session = fSessions.get(0); + if (fSessions.size() > 1) { + session = fSessions.get(1); + } + ChannelSftp channel = openSftpChannel(session); + if (channel == null) { + throw new RemoteConnectionException(Messages.JSchConnection_Unable_to_open_sftp_channel); + } + return channel; } public Channel getStreamForwarder(String host, int port) throws RemoteConnectionException { diff --git a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/AbstractRemoteCommand.java b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/AbstractRemoteCommand.java index 3d49cba56e0..8649e9a62d0 100755 --- a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/AbstractRemoteCommand.java +++ b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/AbstractRemoteCommand.java @@ -188,7 +188,7 @@ public abstract class AbstractRemoteCommand { private ChannelSftp fSftpChannel; private Future asyncCmdInThread() throws RemoteConnectionException { - setChannel(fConnection.getSftpChannel()); + setChannel(fConnection.getSftpCommandChannel()); return fPool.submit(this); } @@ -244,7 +244,6 @@ public abstract class AbstractRemoteCommand { if (e.getCause() instanceof SftpException) { throw (SftpException) e.getCause(); } - getChannel().disconnect(); throw new RemoteConnectionException(e.getMessage()); } getProgressMonitor().worked(1); @@ -253,7 +252,6 @@ public abstract class AbstractRemoteCommand { Thread.currentThread().interrupt(); // set current thread flag } future.cancel(true); - getChannel().disconnect(); throw new RemoteConnectionException(Messages.AbstractRemoteCommand_Operation_cancelled_by_user); } } diff --git a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/GetInputStreamCommand.java b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/GetInputStreamCommand.java index 9c3743d3afa..aa552348c26 100644 --- a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/GetInputStreamCommand.java +++ b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/GetInputStreamCommand.java @@ -11,12 +11,24 @@ import org.eclipse.remote.core.exception.RemoteConnectionException; import org.eclipse.remote.internal.jsch.core.JSchConnection; import org.eclipse.remote.internal.jsch.core.messages.Messages; +import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.SftpException; +/** + * The JSch implementation does not support multiple streams open on a single channel, so we must create a new channel for each + * subsequent stream. This has the problem that there are usually only a limited number of channels that can be opened + * simultaneously, so it is possible that this call will fail unless the open streams are closed first. + * + * This code will use the initial (command) channel first, or if that is already being used, will open a new stream. It must be + * careful not to close the command stream as other threads may still be using it. + */ public class GetInputStreamCommand extends AbstractRemoteCommand { private final IPath fRemotePath; + private static ChannelSftp commandChannel; + private ChannelSftp thisChannel; + public GetInputStreamCommand(JSchConnection connection, IPath path) { super(connection); fRemotePath = path; @@ -25,19 +37,124 @@ public class GetInputStreamCommand extends AbstractRemoteCommand { @Override public InputStream getResult(IProgressMonitor monitor) throws RemoteConnectionException { final SubMonitor subMon = SubMonitor.convert(monitor, 10); - SftpCallable c = new SftpCallable() { + + final SftpCallable c = new SftpCallable() { + private ChannelSftp newChannel() throws IOException { + synchronized (GetInputStreamCommand.class) { + if (commandChannel != null) { + try { + thisChannel = getConnection().newSftpChannel(); + return thisChannel; + } catch (RemoteConnectionException e) { + throw new IOException(e.getMessage()); + } + } + thisChannel = commandChannel = getChannel(); + return commandChannel; + } + } + @Override public InputStream call() throws JSchException, SftpException, IOException { - try { - return getConnection().getSftpChannel().get(fRemotePath.toString(), - new CommandProgressMonitor(NLS.bind(Messages.GetInputStreamCommand_Receiving, fRemotePath.toString()), getProgressMonitor())); - } catch (RemoteConnectionException e) { - throw new IOException(e.getMessage()); - } + return newChannel().get(fRemotePath.toString(), new CommandProgressMonitor( + NLS.bind(Messages.GetInputStreamCommand_Receiving, fRemotePath.toString()), getProgressMonitor())); } }; try { - return c.getResult(subMon.newChild(10)); + final InputStream stream = c.getResult(subMon.newChild(10)); + return new InputStream() { + @Override + public int read() throws IOException { + return stream.read(); + } + + /* + * (non-Javadoc) + * + * @see java.io.InputStream#close() + */ + @Override + public void close() throws IOException { + stream.close(); + synchronized (GetInputStreamCommand.class) { + if (thisChannel != commandChannel) { + thisChannel.disconnect(); + } else { + commandChannel = null; + } + } + } + + /* + * (non-Javadoc) + * + * @see java.io.InputStream#read(byte[]) + */ + @Override + public int read(byte[] b) throws IOException { + return stream.read(b); + } + + /* + * (non-Javadoc) + * + * @see java.io.InputStream#read(byte[], int, int) + */ + @Override + public int read(byte[] b, int off, int len) throws IOException { + return stream.read(b, off, len); + } + + /* + * (non-Javadoc) + * + * @see java.io.InputStream#skip(long) + */ + @Override + public long skip(long n) throws IOException { + return stream.skip(n); + } + + /* + * (non-Javadoc) + * + * @see java.io.InputStream#available() + */ + @Override + public int available() throws IOException { + return stream.available(); + } + + /* + * (non-Javadoc) + * + * @see java.io.InputStream#mark(int) + */ + @Override + public synchronized void mark(int readlimit) { + stream.mark(readlimit); + } + + /* + * (non-Javadoc) + * + * @see java.io.InputStream#reset() + */ + @Override + public synchronized void reset() throws IOException { + stream.reset(); + } + + /* + * (non-Javadoc) + * + * @see java.io.InputStream#markSupported() + */ + @Override + public boolean markSupported() { + return stream.markSupported(); + } + }; } catch (SftpException e) { throw new RemoteConnectionException(e.getMessage()); } diff --git a/tests/org.eclipse.remote.jsch.tests/src/org/eclipse/remote/jsch/tests/FileStoreTests.java b/tests/org.eclipse.remote.jsch.tests/src/org/eclipse/remote/jsch/tests/FileStoreTests.java index 8c1e3ed2504..62d0ddac2f4 100644 --- a/tests/org.eclipse.remote.jsch.tests/src/org/eclipse/remote/jsch/tests/FileStoreTests.java +++ b/tests/org.eclipse.remote.jsch.tests/src/org/eclipse/remote/jsch/tests/FileStoreTests.java @@ -9,8 +9,6 @@ import java.io.OutputStream; import java.io.OutputStreamWriter; import java.net.URI; -import junit.framework.TestCase; - import org.eclipse.core.filesystem.EFS; import org.eclipse.core.filesystem.IFileInfo; import org.eclipse.core.filesystem.IFileStore; @@ -24,6 +22,8 @@ import org.eclipse.remote.core.IRemoteFileService; import org.eclipse.remote.core.IRemoteServicesManager; import org.eclipse.remote.internal.jsch.core.JSchConnection; +import junit.framework.TestCase; + public class FileStoreTests extends TestCase { private static final String CONNECTION_NAME = "test_connection"; private static final String USERNAME = "test"; @@ -33,7 +33,9 @@ public class FileStoreTests extends TestCase { private static final String REMOTE_DIR = "/tmp/ptp_" + USERNAME + "/filestore_tests"; private static final String LOCAL_FILE = "local_file"; private static final String REMOTE_FILE = "remote_file"; + private static final String REMOTE_FILE2 = "remote_file2"; private static final String TEST_STRING = "a string containing fairly *()(*&^$%## random text"; + private static final String TEST_STRING2 = "a different string containing fairly *()(*&^$%## random text"; private IRemoteConnectionType fConnectionType; private IRemoteConnection fRemoteConnection; @@ -82,6 +84,67 @@ public class FileStoreTests extends TestCase { } } + public void testMultiStreams() { + IFileStore remoteFileStore = fRemoteDir.getChild(REMOTE_FILE); + IFileStore remoteFileStore2 = fRemoteDir.getChild(REMOTE_FILE2); + + try { + createFile(remoteFileStore, TEST_STRING); + createFile(remoteFileStore2, TEST_STRING2); + } catch (Exception e) { + fail(e.getMessage()); + } + + assertTrue(remoteFileStore.fetchInfo().exists()); + assertTrue(remoteFileStore2.fetchInfo().exists()); + + /* + * Check how many streams we can open + */ + InputStream streams[] = new InputStream[100]; + int streamCount = 0; + + for (; streamCount < streams.length; streamCount++) { + try { + streams[streamCount] = remoteFileStore.openInputStream(EFS.NONE, null); + } catch (Exception e) { + if (!e.getMessage().endsWith("channel is not opened.")) { + fail(e.getMessage()); + } + break; + } + } + + for (int i = 0; i < streamCount; i++) { + try { + streams[i].close(); + } catch (IOException e) { + // No need to deal with this + } + } + + for (int i = 0; i < streamCount / 2; i++) { + try { + InputStream stream = remoteFileStore.openInputStream(EFS.NONE, null); + assertNotNull(stream); + BufferedReader buf = new BufferedReader(new InputStreamReader(stream)); + String line = buf.readLine().trim(); + assertTrue(line.equals(TEST_STRING)); + + InputStream stream2 = remoteFileStore2.openInputStream(EFS.NONE, null); + assertNotNull(stream2); + BufferedReader buf2 = new BufferedReader(new InputStreamReader(stream2)); + String line2 = buf2.readLine().trim(); + assertTrue(line2.equals(TEST_STRING2)); + + stream.close(); + stream2.close(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + } + public void testCopy() { final IFileStore localFileStore = fLocalDir.getChild(LOCAL_FILE); final IFileStore remoteFileStore = fRemoteDir.getChild(REMOTE_FILE);