mirror of
https://github.com/eclipse-cdt/cdt
synced 2025-07-27 19:05:38 +02:00
Bug 479433 - Add better support for multiple input streams.
Change-Id: I6234928c5ef6129cda3b77fd7e6467c0e817d7b4 Signed-off-by: Greg Watson <g.watson@computer.org>
This commit is contained in:
parent
1eb9e0a683
commit
b1e764a163
4 changed files with 225 additions and 31 deletions
|
@ -257,7 +257,7 @@ public class JSchConnection implements IRemoteConnectionControlService, IRemoteC
|
||||||
private final Map<String, String> fProperties = new HashMap<String, String>();
|
private final Map<String, String> fProperties = new HashMap<String, String>();
|
||||||
private final List<Session> fSessions = new ArrayList<Session>();
|
private final List<Session> fSessions = new ArrayList<Session>();
|
||||||
|
|
||||||
private ChannelSftp fSftpChannel;
|
private ChannelSftp fSftpCommandChannel;
|
||||||
private boolean isFullySetup; // including sftp channel and environment
|
private boolean isFullySetup; // including sftp channel and environment
|
||||||
|
|
||||||
private static final Map<IRemoteConnection, JSchConnection> connectionMap = new HashMap<>();
|
private static final Map<IRemoteConnection, JSchConnection> connectionMap = new HashMap<>();
|
||||||
|
@ -363,11 +363,11 @@ public class JSchConnection implements IRemoteConnectionControlService, IRemoteC
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void cleanup() {
|
private synchronized void cleanup() {
|
||||||
if (fSftpChannel != null) {
|
if (fSftpCommandChannel != null) {
|
||||||
if (fSftpChannel.isConnected()) {
|
if (fSftpCommandChannel.isConnected()) {
|
||||||
fSftpChannel.disconnect();
|
fSftpCommandChannel.disconnect();
|
||||||
}
|
}
|
||||||
fSftpChannel = null;
|
fSftpCommandChannel = null;
|
||||||
}
|
}
|
||||||
for (Session session : fSessions) {
|
for (Session session : fSessions) {
|
||||||
if (session.isConnected()) {
|
if (session.isConnected()) {
|
||||||
|
@ -677,24 +677,40 @@ public class JSchConnection implements IRemoteConnectionControlService, IRemoteC
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Open an sftp channel to the remote host. Always use the second session if available.
|
* Open an sftp command channel to the remote host. This channel is for commands that do not require any
|
||||||
|
* state being preserved and should not be closed. Long running commands (such as get/put) should use a separate channel
|
||||||
|
* obtained via {#link #newSftpChannel()}.
|
||||||
|
*
|
||||||
|
* Always use the second session if available.
|
||||||
*
|
*
|
||||||
* @return sftp channel or null if the progress monitor was cancelled
|
* @return sftp channel
|
||||||
* @throws RemoteConnectionException
|
* @throws RemoteConnectionException
|
||||||
* if a channel could not be opened
|
* if a channel could not be opened
|
||||||
*/
|
*/
|
||||||
public ChannelSftp getSftpChannel() throws RemoteConnectionException {
|
public ChannelSftp getSftpCommandChannel() throws RemoteConnectionException {
|
||||||
if (fSftpChannel == null || fSftpChannel.isClosed()) {
|
if (fSftpCommandChannel == null || fSftpCommandChannel.isClosed()) {
|
||||||
Session session = fSessions.get(0);
|
fSftpCommandChannel = newSftpChannel();
|
||||||
if (fSessions.size() > 1) {
|
|
||||||
session = fSessions.get(1);
|
|
||||||
}
|
|
||||||
fSftpChannel = openSftpChannel(session);
|
|
||||||
if (fSftpChannel == null) {
|
|
||||||
throw new RemoteConnectionException(Messages.JSchConnection_Unable_to_open_sftp_channel);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return fSftpChannel;
|
return fSftpCommandChannel;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Open a channel for long running commands. This channel should be closed when the command is completed.
|
||||||
|
*
|
||||||
|
* @return sftp channel
|
||||||
|
* @throws RemoteConnectionException
|
||||||
|
* if a channel could not be opened
|
||||||
|
*/
|
||||||
|
public ChannelSftp newSftpChannel() throws RemoteConnectionException {
|
||||||
|
Session session = fSessions.get(0);
|
||||||
|
if (fSessions.size() > 1) {
|
||||||
|
session = fSessions.get(1);
|
||||||
|
}
|
||||||
|
ChannelSftp channel = openSftpChannel(session);
|
||||||
|
if (channel == null) {
|
||||||
|
throw new RemoteConnectionException(Messages.JSchConnection_Unable_to_open_sftp_channel);
|
||||||
|
}
|
||||||
|
return channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Channel getStreamForwarder(String host, int port) throws RemoteConnectionException {
|
public Channel getStreamForwarder(String host, int port) throws RemoteConnectionException {
|
||||||
|
|
|
@ -188,7 +188,7 @@ public abstract class AbstractRemoteCommand<T> {
|
||||||
private ChannelSftp fSftpChannel;
|
private ChannelSftp fSftpChannel;
|
||||||
|
|
||||||
private Future<T1> asyncCmdInThread() throws RemoteConnectionException {
|
private Future<T1> asyncCmdInThread() throws RemoteConnectionException {
|
||||||
setChannel(fConnection.getSftpChannel());
|
setChannel(fConnection.getSftpCommandChannel());
|
||||||
return fPool.submit(this);
|
return fPool.submit(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -244,7 +244,6 @@ public abstract class AbstractRemoteCommand<T> {
|
||||||
if (e.getCause() instanceof SftpException) {
|
if (e.getCause() instanceof SftpException) {
|
||||||
throw (SftpException) e.getCause();
|
throw (SftpException) e.getCause();
|
||||||
}
|
}
|
||||||
getChannel().disconnect();
|
|
||||||
throw new RemoteConnectionException(e.getMessage());
|
throw new RemoteConnectionException(e.getMessage());
|
||||||
}
|
}
|
||||||
getProgressMonitor().worked(1);
|
getProgressMonitor().worked(1);
|
||||||
|
@ -253,7 +252,6 @@ public abstract class AbstractRemoteCommand<T> {
|
||||||
Thread.currentThread().interrupt(); // set current thread flag
|
Thread.currentThread().interrupt(); // set current thread flag
|
||||||
}
|
}
|
||||||
future.cancel(true);
|
future.cancel(true);
|
||||||
getChannel().disconnect();
|
|
||||||
throw new RemoteConnectionException(Messages.AbstractRemoteCommand_Operation_cancelled_by_user);
|
throw new RemoteConnectionException(Messages.AbstractRemoteCommand_Operation_cancelled_by_user);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,12 +11,24 @@ import org.eclipse.remote.core.exception.RemoteConnectionException;
|
||||||
import org.eclipse.remote.internal.jsch.core.JSchConnection;
|
import org.eclipse.remote.internal.jsch.core.JSchConnection;
|
||||||
import org.eclipse.remote.internal.jsch.core.messages.Messages;
|
import org.eclipse.remote.internal.jsch.core.messages.Messages;
|
||||||
|
|
||||||
|
import com.jcraft.jsch.ChannelSftp;
|
||||||
import com.jcraft.jsch.JSchException;
|
import com.jcraft.jsch.JSchException;
|
||||||
import com.jcraft.jsch.SftpException;
|
import com.jcraft.jsch.SftpException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The JSch implementation does not support multiple streams open on a single channel, so we must create a new channel for each
|
||||||
|
* subsequent stream. This has the problem that there are usually only a limited number of channels that can be opened
|
||||||
|
* simultaneously, so it is possible that this call will fail unless the open streams are closed first.
|
||||||
|
*
|
||||||
|
* This code will use the initial (command) channel first, or if that is already being used, will open a new stream. It must be
|
||||||
|
* careful not to close the command stream as other threads may still be using it.
|
||||||
|
*/
|
||||||
public class GetInputStreamCommand extends AbstractRemoteCommand<InputStream> {
|
public class GetInputStreamCommand extends AbstractRemoteCommand<InputStream> {
|
||||||
private final IPath fRemotePath;
|
private final IPath fRemotePath;
|
||||||
|
|
||||||
|
private static ChannelSftp commandChannel;
|
||||||
|
private ChannelSftp thisChannel;
|
||||||
|
|
||||||
public GetInputStreamCommand(JSchConnection connection, IPath path) {
|
public GetInputStreamCommand(JSchConnection connection, IPath path) {
|
||||||
super(connection);
|
super(connection);
|
||||||
fRemotePath = path;
|
fRemotePath = path;
|
||||||
|
@ -25,19 +37,124 @@ public class GetInputStreamCommand extends AbstractRemoteCommand<InputStream> {
|
||||||
@Override
|
@Override
|
||||||
public InputStream getResult(IProgressMonitor monitor) throws RemoteConnectionException {
|
public InputStream getResult(IProgressMonitor monitor) throws RemoteConnectionException {
|
||||||
final SubMonitor subMon = SubMonitor.convert(monitor, 10);
|
final SubMonitor subMon = SubMonitor.convert(monitor, 10);
|
||||||
SftpCallable<InputStream> c = new SftpCallable<InputStream>() {
|
|
||||||
|
final SftpCallable<InputStream> c = new SftpCallable<InputStream>() {
|
||||||
|
private ChannelSftp newChannel() throws IOException {
|
||||||
|
synchronized (GetInputStreamCommand.class) {
|
||||||
|
if (commandChannel != null) {
|
||||||
|
try {
|
||||||
|
thisChannel = getConnection().newSftpChannel();
|
||||||
|
return thisChannel;
|
||||||
|
} catch (RemoteConnectionException e) {
|
||||||
|
throw new IOException(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
thisChannel = commandChannel = getChannel();
|
||||||
|
return commandChannel;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InputStream call() throws JSchException, SftpException, IOException {
|
public InputStream call() throws JSchException, SftpException, IOException {
|
||||||
try {
|
return newChannel().get(fRemotePath.toString(), new CommandProgressMonitor(
|
||||||
return getConnection().getSftpChannel().get(fRemotePath.toString(),
|
NLS.bind(Messages.GetInputStreamCommand_Receiving, fRemotePath.toString()), getProgressMonitor()));
|
||||||
new CommandProgressMonitor(NLS.bind(Messages.GetInputStreamCommand_Receiving, fRemotePath.toString()), getProgressMonitor()));
|
|
||||||
} catch (RemoteConnectionException e) {
|
|
||||||
throw new IOException(e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
try {
|
try {
|
||||||
return c.getResult(subMon.newChild(10));
|
final InputStream stream = c.getResult(subMon.newChild(10));
|
||||||
|
return new InputStream() {
|
||||||
|
@Override
|
||||||
|
public int read() throws IOException {
|
||||||
|
return stream.read();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see java.io.InputStream#close()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
stream.close();
|
||||||
|
synchronized (GetInputStreamCommand.class) {
|
||||||
|
if (thisChannel != commandChannel) {
|
||||||
|
thisChannel.disconnect();
|
||||||
|
} else {
|
||||||
|
commandChannel = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see java.io.InputStream#read(byte[])
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int read(byte[] b) throws IOException {
|
||||||
|
return stream.read(b);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see java.io.InputStream#read(byte[], int, int)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int read(byte[] b, int off, int len) throws IOException {
|
||||||
|
return stream.read(b, off, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see java.io.InputStream#skip(long)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public long skip(long n) throws IOException {
|
||||||
|
return stream.skip(n);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see java.io.InputStream#available()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int available() throws IOException {
|
||||||
|
return stream.available();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see java.io.InputStream#mark(int)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void mark(int readlimit) {
|
||||||
|
stream.mark(readlimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see java.io.InputStream#reset()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void reset() throws IOException {
|
||||||
|
stream.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see java.io.InputStream#markSupported()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean markSupported() {
|
||||||
|
return stream.markSupported();
|
||||||
|
}
|
||||||
|
};
|
||||||
} catch (SftpException e) {
|
} catch (SftpException e) {
|
||||||
throw new RemoteConnectionException(e.getMessage());
|
throw new RemoteConnectionException(e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,8 +9,6 @@ import java.io.OutputStream;
|
||||||
import java.io.OutputStreamWriter;
|
import java.io.OutputStreamWriter;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
|
||||||
|
|
||||||
import org.eclipse.core.filesystem.EFS;
|
import org.eclipse.core.filesystem.EFS;
|
||||||
import org.eclipse.core.filesystem.IFileInfo;
|
import org.eclipse.core.filesystem.IFileInfo;
|
||||||
import org.eclipse.core.filesystem.IFileStore;
|
import org.eclipse.core.filesystem.IFileStore;
|
||||||
|
@ -24,6 +22,8 @@ import org.eclipse.remote.core.IRemoteFileService;
|
||||||
import org.eclipse.remote.core.IRemoteServicesManager;
|
import org.eclipse.remote.core.IRemoteServicesManager;
|
||||||
import org.eclipse.remote.internal.jsch.core.JSchConnection;
|
import org.eclipse.remote.internal.jsch.core.JSchConnection;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
public class FileStoreTests extends TestCase {
|
public class FileStoreTests extends TestCase {
|
||||||
private static final String CONNECTION_NAME = "test_connection";
|
private static final String CONNECTION_NAME = "test_connection";
|
||||||
private static final String USERNAME = "test";
|
private static final String USERNAME = "test";
|
||||||
|
@ -33,7 +33,9 @@ public class FileStoreTests extends TestCase {
|
||||||
private static final String REMOTE_DIR = "/tmp/ptp_" + USERNAME + "/filestore_tests";
|
private static final String REMOTE_DIR = "/tmp/ptp_" + USERNAME + "/filestore_tests";
|
||||||
private static final String LOCAL_FILE = "local_file";
|
private static final String LOCAL_FILE = "local_file";
|
||||||
private static final String REMOTE_FILE = "remote_file";
|
private static final String REMOTE_FILE = "remote_file";
|
||||||
|
private static final String REMOTE_FILE2 = "remote_file2";
|
||||||
private static final String TEST_STRING = "a string containing fairly *()(*&^$%## random text";
|
private static final String TEST_STRING = "a string containing fairly *()(*&^$%## random text";
|
||||||
|
private static final String TEST_STRING2 = "a different string containing fairly *()(*&^$%## random text";
|
||||||
|
|
||||||
private IRemoteConnectionType fConnectionType;
|
private IRemoteConnectionType fConnectionType;
|
||||||
private IRemoteConnection fRemoteConnection;
|
private IRemoteConnection fRemoteConnection;
|
||||||
|
@ -82,6 +84,67 @@ public class FileStoreTests extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMultiStreams() {
|
||||||
|
IFileStore remoteFileStore = fRemoteDir.getChild(REMOTE_FILE);
|
||||||
|
IFileStore remoteFileStore2 = fRemoteDir.getChild(REMOTE_FILE2);
|
||||||
|
|
||||||
|
try {
|
||||||
|
createFile(remoteFileStore, TEST_STRING);
|
||||||
|
createFile(remoteFileStore2, TEST_STRING2);
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail(e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(remoteFileStore.fetchInfo().exists());
|
||||||
|
assertTrue(remoteFileStore2.fetchInfo().exists());
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check how many streams we can open
|
||||||
|
*/
|
||||||
|
InputStream streams[] = new InputStream[100];
|
||||||
|
int streamCount = 0;
|
||||||
|
|
||||||
|
for (; streamCount < streams.length; streamCount++) {
|
||||||
|
try {
|
||||||
|
streams[streamCount] = remoteFileStore.openInputStream(EFS.NONE, null);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!e.getMessage().endsWith("channel is not opened.")) {
|
||||||
|
fail(e.getMessage());
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < streamCount; i++) {
|
||||||
|
try {
|
||||||
|
streams[i].close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
// No need to deal with this
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < streamCount / 2; i++) {
|
||||||
|
try {
|
||||||
|
InputStream stream = remoteFileStore.openInputStream(EFS.NONE, null);
|
||||||
|
assertNotNull(stream);
|
||||||
|
BufferedReader buf = new BufferedReader(new InputStreamReader(stream));
|
||||||
|
String line = buf.readLine().trim();
|
||||||
|
assertTrue(line.equals(TEST_STRING));
|
||||||
|
|
||||||
|
InputStream stream2 = remoteFileStore2.openInputStream(EFS.NONE, null);
|
||||||
|
assertNotNull(stream2);
|
||||||
|
BufferedReader buf2 = new BufferedReader(new InputStreamReader(stream2));
|
||||||
|
String line2 = buf2.readLine().trim();
|
||||||
|
assertTrue(line2.equals(TEST_STRING2));
|
||||||
|
|
||||||
|
stream.close();
|
||||||
|
stream2.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testCopy() {
|
public void testCopy() {
|
||||||
final IFileStore localFileStore = fLocalDir.getChild(LOCAL_FILE);
|
final IFileStore localFileStore = fLocalDir.getChild(LOCAL_FILE);
|
||||||
final IFileStore remoteFileStore = fRemoteDir.getChild(REMOTE_FILE);
|
final IFileStore remoteFileStore = fRemoteDir.getChild(REMOTE_FILE);
|
||||||
|
|
Loading…
Add table
Reference in a new issue