1
0
Fork 0
mirror of https://github.com/eclipse-cdt/cdt synced 2025-04-29 19:45:01 +02:00

Bug 310345 - [concurrent] Asynchronous Cache Programming Model (ACPM) utilities for DSF

This commit is contained in:
Pawel Piech 2010-10-18 16:50:06 +00:00
parent 400a4127fd
commit c6e0fac759
7 changed files with 1353 additions and 16 deletions

View file

@ -0,0 +1,324 @@
package org.eclipse.cdt.dsf.concurrent;
/*******************************************************************************
* Copyright (c) 2008 Wind River Systems, Inc. and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
import java.util.ArrayList;
import java.util.List;
import org.eclipse.cdt.dsf.internal.DsfPlugin;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
/**
* A base implementation of a general purpose cache. Sub classes must implement
* {@link #retrieve(DataRequestMonitor)} to fetch data from the data source.
* Sub-classes or clients are also responsible for calling {@link #disable()}
* and {@link #reset()} to manage the state of the cache in response to events
* from the data source.
* <p>
* This cache requires an executor to use. The executor is used to synchronize
* access to the cache state and data.
* </p>
* @since 2.2
*/
@ConfinedToDsfExecutor("fExecutor")
public abstract class AbstractCache<V> implements ICache<V> {
private static final IStatus INVALID_STATUS = new Status(IStatus.ERROR, DsfPlugin.PLUGIN_ID, IDsfStatusConstants.INVALID_STATE, "Cache invalid", null); //$NON-NLS-1$
private static final IStatus DISABLED_STATUS = new Status(IStatus.ERROR, DsfPlugin.PLUGIN_ID, IDsfStatusConstants.INVALID_STATE, "Cache disabled", null); //$NON-NLS-1$
private class RequestCanceledListener implements RequestMonitor.ICanceledListener {
public void requestCanceled(final RequestMonitor canceledRm) {
fExecutor.execute(new Runnable() {
public void run() {
handleCanceledRm(canceledRm);
}
});
}
};
private RequestCanceledListener fRequestCanceledListener = new RequestCanceledListener();
private boolean fValid;
private V fData;
private IStatus fStatus = INVALID_STATUS;
@ThreadSafe
private Object fWaitingList;
private final ImmediateInDsfExecutor fExecutor;
public AbstractCache(ImmediateInDsfExecutor executor) {
fExecutor = executor;
}
public DsfExecutor getExecutor() {
return fExecutor.getDsfExecutor();
}
protected ImmediateInDsfExecutor getImmediateInDsfExecutor() {
return fExecutor;
}
/**
* Sub-classes should override this method to retrieve the cache data
* from its source.
*
* @param rm Request monitor for completion of data retrieval.
*/
abstract protected void retrieve();
/**
* Called while holding a lock to "this". No new request will start until
* this call returns.
*/
@ThreadSafe
abstract protected void canceled();
public boolean isValid() {
return fValid;
}
public V getData() {
return fData;
}
public IStatus getStatus() {
return fStatus;
}
public void request(final DataRequestMonitor<V> rm) {
wait(rm);
}
public void wait(RequestMonitor rm) {
assert fExecutor.getDsfExecutor().isInExecutorThread();
if (!fValid) {
boolean first = false;
synchronized (this) {
if (fWaitingList == null) {
first = true;
fWaitingList = rm;
} else if (fWaitingList instanceof RequestMonitor[]) {
RequestMonitor[] waitingList = (RequestMonitor[])fWaitingList;
int waitingListLength = waitingList.length;
int i;
for (i = 0; i < waitingListLength; i++) {
if (waitingList[i] == null) {
waitingList[i] = rm;
break;
}
}
if (i == waitingListLength) {
RequestMonitor[] newWaitingList = new RequestMonitor[waitingListLength + 1];
System.arraycopy(waitingList, 0, newWaitingList, 0, waitingListLength);
newWaitingList[waitingListLength] = rm;
fWaitingList = newWaitingList;
}
} else {
RequestMonitor[] newWaitingList = new RequestMonitor[2];
newWaitingList[0] = (RequestMonitor)fWaitingList;
newWaitingList[1] = rm;
fWaitingList = newWaitingList;
}
}
rm.addCancelListener(fRequestCanceledListener);
if (first) {
retrieve();
}
} else {
if (rm instanceof DataRequestMonitor<?>) {
@SuppressWarnings("unchecked")
DataRequestMonitor<V> drm = (DataRequestMonitor<V>)rm;
drm.setData(fData);
}
rm.setStatus(fStatus);
rm.done();
}
}
private void doSet(V data, IStatus status, boolean valid) {
assert fExecutor.getDsfExecutor().isInExecutorThread();
fData = data;
fStatus = status;
fValid = valid;
Object waiting = null;
synchronized(this) {
waiting = fWaitingList;
fWaitingList = null;
}
if (waiting != null) {
if (waiting instanceof RequestMonitor) {
completeWaitingRm((RequestMonitor)waiting);
} else if (waiting instanceof RequestMonitor[]) {
RequestMonitor[] waitingList = (RequestMonitor[])waiting;
for (int i = 0; i < waitingList.length; i++) {
if (waitingList[i] != null) {
completeWaitingRm(waitingList[i]);
}
}
}
waiting = null;
}
}
private void completeWaitingRm(RequestMonitor rm) {
if (rm instanceof DataRequestMonitor<?>) {
@SuppressWarnings("unchecked")
DataRequestMonitor<V> drm = (DataRequestMonitor<V>)rm;
drm.setData(fData);
}
rm.setStatus(fStatus);
rm.removeCancelListener(fRequestCanceledListener);
rm.done();
}
private void handleCanceledRm(final RequestMonitor rm) {
boolean found = false;
boolean waiting = false;
synchronized (this) {
if (rm.equals(fWaitingList)) {
found = true;
waiting = false;
fWaitingList = null;
} else if(fWaitingList instanceof RequestMonitor[]) {
RequestMonitor[] waitingList = (RequestMonitor[])fWaitingList;
for (int i = 0; i < waitingList.length; i++) {
if (!found && rm.equals(waitingList[i])) {
waitingList[i] = null;
found = true;
}
waiting = waiting || waitingList[i] != null;
}
}
if (/*found && */!waiting) {
canceled();
}
}
// If we have no clients waiting anymore, cancel the request
if (found) {
// We no longer need to listen to cancelations.
rm.removeCancelListener(fRequestCanceledListener);
rm.setStatus(Status.CANCEL_STATUS);
rm.done();
}
}
@ThreadSafe
protected boolean isCanceled() {
boolean canceled;
List<RequestMonitor> canceledRms = null;
synchronized (this) {
if (fWaitingList instanceof RequestMonitor && ((RequestMonitor)fWaitingList).isCanceled()) {
canceledRms = new ArrayList<RequestMonitor>(1);
canceledRms.add((RequestMonitor)fWaitingList);
fWaitingList = null;
} else if(fWaitingList instanceof RequestMonitor[]) {
boolean waiting = false;
RequestMonitor[] waitingList = (RequestMonitor[])fWaitingList;
for (int i = 0; i < waitingList.length; i++) {
if (waitingList[i] != null && waitingList[i].isCanceled()) {
if (canceledRms == null) {
canceledRms = new ArrayList<RequestMonitor>(1);
}
canceledRms.add( waitingList[i] );
waitingList[i] = null;
}
waiting = waiting || waitingList[i] != null;
}
if (!waiting) {
fWaitingList = null;
}
}
canceled = fWaitingList == null;
}
if (canceledRms != null) {
for (RequestMonitor canceledRm : canceledRms) {
canceledRm.setStatus(Status.CANCEL_STATUS);
canceledRm.removeCancelListener(fRequestCanceledListener);
canceledRm.done();
}
}
return canceled;
}
/**
* Resets the cache with a data value <code>null</code> and an error
* status with code {@link IDsfStatusConstants#INVALID_STATE}.
*
* @see #reset(Object, IStatus)
*/
protected void reset() {
reset(null, INVALID_STATUS);
}
/**
* Resets the cache with given data and status. Resetting the cache
* forces the cache to be invalid and cancels any current pending requests
* from data source.
* <p>
* This method should be called when the data source has issued an event
* indicating that the source data has changed but data may still be
* retrieved. Clients may need to re-request data following cache reset.
* </p>
* @param data The data that should be returned to any clients currently
* waiting for cache data.
* @status The status that should be returned to any clients currently
* waiting for cache data.
*/
protected void reset(V data, IStatus status) {
doSet(data, status, false);
}
/**
* Disables the cache from retrieving data from the source. If the cache
* is already valid the data and status is retained. If the cache is not
* valid, then data value <code>null</code> and an error status with code
* {@link IDsfStatusConstants#INVALID_STATE} are set.
*
* @see #set(Object, IStatus)
*/
protected void disable() {
if (!fValid) {
set(null, DISABLED_STATUS);
}
}
/**
* Resets the cache then disables it. When a cache is disabled it means
* that it is valid and requests to the data source will not be sent.
* <p>
* This method should be called when the data source has issued an event
* indicating that the source data has changed and future requests for
* data will return the given data and status. Once the source data
* becomes available again, clients should call {@link #reset()}.
* </p>
* @param data The data that should be returned to any clients waiting for
* cache data and for clients requesting data until the cache is reset again.
* @status The status that should be returned to any clients waiting for
* cache data and for clients requesting data until the cache is reset again.
*
* @see #reset(Object, IStatus)
*/
protected void set(V data, IStatus status) {
doSet(data, status, true);
}
}

View file

@ -0,0 +1,69 @@
/*******************************************************************************
* Copyright (c) 2010 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.cdt.dsf.concurrent;
import org.eclipse.core.runtime.IStatus;
/**
* The interface for a general purpose cache that caches the result of a single
* request. Implementations need to provide the logic to fetch data from an
* asynchronous data source.
* <p>
* This cache requires an executor to use. The executor is used to synchronize
* access to the cache state and data.
* </p>
* @since 2.2
*/
@ConfinedToDsfExecutor("getExecutor()")
public interface ICache<V> {
/**
* The executor that must be used to access this cache.
*/
public DsfExecutor getExecutor();
/**
* Returns the current data value held by this cache. Clients should first
* call isValid() to determine if the data is up to date.
*/
public V getData();
/**
* Returns the status of the source request held by this cache. Clients
* should first call isValid() to determine if the data is up to date.
*/
public IStatus getStatus();
/**
* Wait for the cache to become valid. If the cache is valid already, the
* request returns immediately, otherwise data will first be retrieved from the
* source.
*
* @param rm RequestMonitor that is called when cache becomes valid.
*/
public void wait(RequestMonitor rm);
/**
* Request data from the cache. The cache is valid, it will complete the
* request immediately, otherwise data will first be retrieved from the
* source.
*
* @param rm RequestMonitor that is called when cache becomes valid.
*/
public void request(DataRequestMonitor<V> rm);
/**
* Returns <code>true</code> if the cache is currently valid. I.e.
* whether the cache can return a value immediately without first
* retrieving it from the data source.
*/
public boolean isValid();
}

View file

@ -0,0 +1,44 @@
/*******************************************************************************
* Copyright (c) 2008 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.cdt.dsf.concurrent;
import java.util.concurrent.Executor;
/**
* @since 2.2
*
*/
public class ImmediateInDsfExecutor implements Executor {
final private DsfExecutor fDsfExecutor;
public DsfExecutor getDsfExecutor() {
return fDsfExecutor;
}
public ImmediateInDsfExecutor(DsfExecutor dsfExecutor) {
fDsfExecutor = dsfExecutor;
}
public void execute(final Runnable command) {
if (fDsfExecutor.isInExecutorThread()) {
command.run();
} else {
fDsfExecutor.execute(new DsfRunnable() {
public void run() {
command.run();
}
});
}
}
}

View file

@ -24,7 +24,7 @@ import org.eclipse.core.runtime.Status;
* This cache requires an executor to use. The executor is used to synchronize
* access to the cache state and data.
* </p>
* @since 2.1
* @since 2.2
*/
@ConfinedToDsfExecutor("fExecutor")
public abstract class RequestCache<V> extends AbstractCache<V> {
@ -36,21 +36,15 @@ public abstract class RequestCache<V> extends AbstractCache<V> {
super(executor);
}
/**
* Sub-classes should override this method to retrieve the cache data
* from its source.
*
* @param rm Request monitor for completion of data retrieval.
*/
@Override
protected void retrieve() {
protected final void retrieve() {
// Make sure to cancel the previous rm. This may lead to the rm being
// canceled twice, but that's not harmful.
if (fRm != null) {
fRm.cancel();
}
fRm = new DataRequestMonitor<V>(getExecutor(), null) {
fRm = new DataRequestMonitor<V>(getImmediateInDsfExecutor(), null) {
private IStatus fRawStatus = Status.OK_STATUS;
@ -95,7 +89,7 @@ public abstract class RequestCache<V> extends AbstractCache<V> {
}
@Override
public void reset(V data, IStatus status) {
protected void reset(V data, IStatus status) {
if (fRm != null) {
fRm.cancel();
fRm = null;
@ -104,7 +98,7 @@ public abstract class RequestCache<V> extends AbstractCache<V> {
}
@Override
public void disable() {
protected void disable() {
if (fRm != null) {
fRm.cancel();
fRm = null;
@ -113,7 +107,7 @@ public abstract class RequestCache<V> extends AbstractCache<V> {
}
@Override
public void set(V data, IStatus status) {
protected void set(V data, IStatus status) {
if (fRm != null) {
fRm.cancel();
fRm = null;

View file

@ -122,7 +122,7 @@ public abstract class Transaction<V> {
* @throws CoreException
* if an error was encountered getting the data from the source
*/
protected void validate(RequestCache<?> cache) throws InvalidCacheException, CoreException {
protected void validate(ICache<?> cache) throws InvalidCacheException, CoreException {
if (cache.isValid()) {
if (!cache.getStatus().isOK()) {
throw new CoreException(cache.getStatus());
@ -141,15 +141,22 @@ public abstract class Transaction<V> {
}
}
/**
* See {@link #validate(RequestCache)}. This variant simply validates
* multiple cache objects.
*/
protected void validate(RequestCache<?> ... caches) throws InvalidCacheException, CoreException {
}
/**
* See {@link #validate(RequestCache)}. This variant simply validates
* multiple cache objects.
*/
protected void validate(RequestCache<?> ... caches) throws InvalidCacheException, CoreException {
protected void validate(Iterable<ICache<?>> caches) throws InvalidCacheException, CoreException {
// Check if any of the caches have errors:
boolean allValid = true;
for (RequestCache<?> cache : caches) {
for (ICache<?> cache : caches) {
if (cache.isValid()) {
if (!cache.getStatus().isOK()) {
throw new CoreException(cache.getStatus());
@ -169,7 +176,7 @@ public abstract class Transaction<V> {
}
};
int count = 0;
for (RequestCache<?> cache : caches) {
for (ICache<?> cache : caches) {
if (!cache.isValid()) {
cache.wait(countringRm);
count++;

View file

@ -0,0 +1,721 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.cdt.tests.dsf.concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import junit.framework.Assert;
import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor;
import org.eclipse.cdt.dsf.concurrent.DsfRunnable;
import org.eclipse.cdt.dsf.concurrent.IDsfStatusConstants;
import org.eclipse.cdt.dsf.concurrent.ImmediateExecutor;
import org.eclipse.cdt.dsf.concurrent.ImmediateInDsfExecutor;
import org.eclipse.cdt.dsf.concurrent.Query;
import org.eclipse.cdt.dsf.concurrent.RequestCache;
import org.eclipse.cdt.tests.dsf.TestDsfExecutor;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Tests that exercise the DataCache object.
*/
public class CacheTests {
TestDsfExecutor fExecutor;
TestCache fTestCache;
DataRequestMonitor<Integer> fRetrieveRm;
class TestCache extends RequestCache<Integer> {
public TestCache() {
super(new ImmediateInDsfExecutor(fExecutor));
}
@Override
protected void retrieve(DataRequestMonitor<Integer> rm) {
synchronized(CacheTests.this) {
fRetrieveRm = rm;
CacheTests.this.notifyAll();
}
}
@Override
protected void reset() {
super.reset();
}
@Override
public void reset(Integer data, IStatus status) {
super.reset(data, status);
}
@Override
public void disable() {
super.disable();
}
@Override
public void set(Integer data, IStatus status) {
super.set(data, status);
}
}
/**
* There's no rule on how quickly the cache has to start data retrieval
* after it has been requested. It could do it immediately, or it could
* wait a dispatch cycle, etc..
*/
private void waitForRetrieveRm() {
synchronized(this) {
while (fRetrieveRm == null) {
try {
wait();
} catch (InterruptedException e) {
return;
}
}
}
}
@Before
public void startExecutor() throws ExecutionException, InterruptedException {
fExecutor = new TestDsfExecutor();
fTestCache = new TestCache();
}
@After
public void shutdownExecutor() throws ExecutionException, InterruptedException {
fExecutor.submit(new DsfRunnable() { public void run() {
fExecutor.shutdown();
}}).get();
if (fExecutor.exceptionsCaught()) {
Throwable[] exceptions = fExecutor.getExceptions();
throw new ExecutionException(exceptions[0]);
}
fRetrieveRm = null;
fTestCache = null;
fExecutor = null;
}
private void assertCacheValidWithData(Object data) {
Assert.assertTrue(fTestCache.isValid());
Assert.assertEquals(data, fTestCache.getData());
Assert.assertTrue(fTestCache.getStatus().isOK());
}
private void assertCacheResetWithoutData() {
Assert.assertFalse(fTestCache.isValid());
Assert.assertEquals(null, fTestCache.getData());
Assert.assertFalse(fTestCache.getStatus().isOK());
Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE);
}
private void assertCacheDisabledWithoutData() {
Assert.assertTrue(fTestCache.isValid());
Assert.assertEquals(null, fTestCache.getData());
Assert.assertFalse(fTestCache.getStatus().isOK());
Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE);
}
private void assertCacheWaiting() {
Assert.assertFalse(fTestCache.isValid());
Assert.assertEquals(null, fTestCache.getData());
Assert.assertFalse(fTestCache.getStatus().isOK());
Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE);
Assert.assertFalse(fRetrieveRm.isCanceled());
}
private void assertCacheCanceled() {
Assert.assertFalse(fTestCache.isValid());
Assert.assertEquals(null, fTestCache.getData());
Assert.assertFalse(fTestCache.getStatus().isOK());
Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE);
Assert.assertTrue(fRetrieveRm.isCanceled());
}
@Test
public void getWithCompletionInDsfThreadTest() throws InterruptedException, ExecutionException {
// Request data from cache
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
// Check initial state
Assert.assertFalse(fTestCache.isValid());
Assert.assertFalse(fTestCache.getStatus().isOK());
Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE);
fExecutor.execute(q);
// Wait until the cache requests the data.
waitForRetrieveRm();
// Check state while waiting for data
Assert.assertFalse(fTestCache.isValid());
// Complete the cache's retrieve data request.
fExecutor.submit(new Callable<Object>() { public Object call() {
fRetrieveRm.setData(1);
fRetrieveRm.done();
// Check that the data is available in the cache immediately
// (in the same dispatch cycle).
Assert.assertEquals(1, (int)fTestCache.getData());
Assert.assertTrue(fTestCache.isValid());
return null;
}}).get();
Assert.assertEquals(1, (int)q.get());
// Re-check final state
assertCacheValidWithData(1);
}
@Test
public void getTest() throws InterruptedException, ExecutionException {
// Check initial state
Assert.assertFalse(fTestCache.isValid());
// Request data from cache
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q);
// Wait until the cache starts data retrieval.
waitForRetrieveRm();
// Check state while waiting for data
Assert.assertFalse(fTestCache.isValid());
// Set the data without using an executor.
fRetrieveRm.setData(1);
fRetrieveRm.done();
Assert.assertEquals(1, (int)q.get());
// Check final state
assertCacheValidWithData(1);
}
@Test
public void getTestWithTwoClients() throws InterruptedException, ExecutionException {
// Check initial state
Assert.assertFalse(fTestCache.isValid());
// Request data from cache
Query<Integer> q1 = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q1);
// Request data from cache again
Query<Integer> q2 = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q2);
// Wait until the cache starts data retrieval.
waitForRetrieveRm();
// Check state while waiting for data
Assert.assertFalse(fTestCache.isValid());
// Set the data without using an executor.
fRetrieveRm.setData(1);
fRetrieveRm.done();
Assert.assertEquals(1, (int)q1.get());
Assert.assertEquals(1, (int)q2.get());
// Check final state
assertCacheValidWithData(1);
}
@Test
public void getTestWithManyClients() throws InterruptedException, ExecutionException {
// Check initial state
Assert.assertFalse(fTestCache.isValid());
// Request data from cache
List<Query<Integer>> qList = new ArrayList<Query<Integer>>();
for (int i = 0; i < 10; i++) {
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q);
qList.add(q);
}
// Wait until the cache starts data retrieval.
waitForRetrieveRm();
// Check state while waiting for data
Assert.assertFalse(fTestCache.isValid());
// Set the data without using an executor.
fRetrieveRm.setData(1);
fRetrieveRm.done();
for (Query<Integer> q : qList) {
Assert.assertEquals(1, (int)q.get());
}
// Check final state
assertCacheValidWithData(1);
}
@Test
public void disableBeforeRequestTest() throws InterruptedException, ExecutionException {
// Disable the cache with a given value
fExecutor.submit(new DsfRunnable() {
public void run() {
fTestCache.disable();
}
}).get();
assertCacheDisabledWithoutData();
// Try to request data from cache
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q);
Thread.sleep(100);
// Retrieval should never have been made.
Assert.assertEquals(null, fRetrieveRm);
try {
Assert.assertEquals(null, q.get());
} catch (ExecutionException e) {
// expected the exception
return;
}
Assert.fail("expected an exeption");
}
@Test
public void disableWhilePendingTest() throws InterruptedException, ExecutionException {
// Request data from cache
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q);
// Disable the cache with a given value
fExecutor.submit(new DsfRunnable() {
public void run() {
fTestCache.disable();
}
}).get();
assertCacheDisabledWithoutData();
// Completed the retrieve RM
fExecutor.submit(new DsfRunnable() {
public void run() {
fRetrieveRm.setData(1);
fRetrieveRm.done();
}
}).get();
// Validate that cache is still disabled without data.
assertCacheDisabledWithoutData();
}
@Test
public void disableWhileValidTest() throws InterruptedException, ExecutionException {
// Request data from cache
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q);
// Wait until the cache starts data retrieval.
waitForRetrieveRm();
// Complete the request
fRetrieveRm.setData(1);
fRetrieveRm.done();
q.get();
// Disable cache
fExecutor.submit(new DsfRunnable() {
public void run() {
fTestCache.disable();
}
}).get();
// Check final state
assertCacheValidWithData(1);
}
@Test
public void disableWithValueTest() throws InterruptedException, ExecutionException {
// Disable the cache with a given value
fExecutor.submit(new DsfRunnable() {
public void run() {
fTestCache.set(2, Status.OK_STATUS);
}
}).get();
// Validate that cache is disabled without data.
assertCacheValidWithData(2);
}
@Test
public void resetBeforeRequestTest() throws InterruptedException, ExecutionException {
// Disable the cache with a given value
fExecutor.submit(new DsfRunnable() {
public void run() {
fTestCache.reset();
}
}).get();
assertCacheResetWithoutData();
// Try to request data from cache (check that cache still works normally)
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q);
// Wait until the cache starts data retrieval.
waitForRetrieveRm();
// Complete the request
fRetrieveRm.setData(1);
fRetrieveRm.done();
// Check result
Assert.assertEquals(1, (int)q.get());
assertCacheValidWithData(1);
}
@Test
public void resetWhilePendingTest() throws InterruptedException, ExecutionException {
// Request data from cache
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q);
// Wait until the cache starts data retrieval.
waitForRetrieveRm();
// Disable the cache with a given value
fExecutor.submit(new DsfRunnable() {
public void run() {
fTestCache.reset();
}
}).get();
assertCacheResetWithoutData();
// Completed the retrieve RM
fExecutor.submit(new DsfRunnable() {
public void run() {
fRetrieveRm.setData(1);
fRetrieveRm.done();
}
}).get();
// Validate that cache is still disabled without data.
assertCacheResetWithoutData();
}
@Test
public void cancelWhilePendingTest() throws InterruptedException, ExecutionException {
// Request data from cache
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q);
// Wait until the cache starts data retrieval.
waitForRetrieveRm();
// Cancel the client request
q.cancel(true);
try {
q.get();
Assert.fail("Expected a cancellation exception");
} catch (CancellationException e) {} // Expected exception;
assertCacheCanceled();
// Completed the retrieve RM
fExecutor.submit(new DsfRunnable() {
public void run() {
fRetrieveRm.setData(1);
fRetrieveRm.done();
}
}).get();
// Validate that cache accepts the canceled request data
assertCacheValidWithData(1);
}
@Test
public void cancelWhilePendingWithoutClientNotificationTest() throws InterruptedException, ExecutionException {
// Request data from cache
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(new DataRequestMonitor<Integer>(ImmediateExecutor.getInstance(), rm) {
@Override
public synchronized void addCancelListener(ICanceledListener listener) {
// Do not add the cancel listener so that the cancel request is not
// propagated to the cache.
}
});
}
};
fExecutor.execute(q);
// Wait until the cache starts data retrieval.
waitForRetrieveRm();
// Cancel the client request
q.cancel(true);
assertCacheCanceled();
try {
q.get();
Assert.fail("Expected a cancellation exception");
} catch (CancellationException e) {} // Expected exception;
// Completed the retrieve RM
fExecutor.submit(new DsfRunnable() {
public void run() {
fRetrieveRm.setData(1);
fRetrieveRm.done();
}
}).get();
// Validate that cache accepts the canceled request data
assertCacheValidWithData(1);
}
@Test
public void cancelWhilePendingWithTwoClientsTest() throws InterruptedException, ExecutionException {
// Request data from cache
Query<Integer> q1 = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q1);
// Request data from cache again
Query<Integer> q2 = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q2);
// Wait until the cache starts data retrieval.
waitForRetrieveRm();
// Cancel the first client request
q1.cancel(true);
try {
q1.get();
Assert.fail("Expected a cancellation exception");
} catch (CancellationException e) {} // Expected exception;
assertCacheWaiting();
// Cancel the second request
q2.cancel(true);
try {
q2.get();
Assert.fail("Expected a cancellation exception");
} catch (CancellationException e) {} // Expected exception;
assertCacheCanceled();
// Completed the retrieve RM
fExecutor.submit(new DsfRunnable() {
public void run() {
fRetrieveRm.setData(1);
fRetrieveRm.done();
}
}).get();
// Validate that cache accepts the canceled request data
assertCacheValidWithData(1);
}
@Test
public void cancelWhilePendingWithManyClientsTest() throws InterruptedException, ExecutionException {
// Request data from cache
List<Query<Integer>> qList = new ArrayList<Query<Integer>>();
for (int i = 0; i < 10; i++) {
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q);
qList.add(q);
}
// Wait until the cache starts data retrieval.
waitForRetrieveRm();
// Cancel some client requests
int[] toCancel = new int[] { 0, 2, 5, 9};
for (int i = 0; i < toCancel.length; i++) {
// Cancel request and verify that its canceled
Query<Integer> q = qList.get(toCancel[i]);
q.cancel(true);
try {
q.get();
Assert.fail("Expected a cancellation exception");
} catch (CancellationException e) {} // Expected exception;
qList.set(toCancel[i], null);
assertCacheWaiting();
}
// Replace canceled requests with new ones
for (int i = 0; i < toCancel.length; i++) {
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q);
qList.set(toCancel[i], q);
assertCacheWaiting();
}
// Now cancel all requests
for (int i = 0; i < (qList.size() - 1); i++) {
// Validate that cache is still waiting and is not canceled
assertCacheWaiting();
qList.get(i).cancel(true);
}
qList.get(qList.size() - 1).cancel(true);
assertCacheCanceled();
// Completed the retrieve RM
fExecutor.submit(new DsfRunnable() {
public void run() {
fRetrieveRm.setData(1);
fRetrieveRm.done();
}
}).get();
// Validate that cache accepts the canceled request data
assertCacheValidWithData(1);
}
@Test
public void resetWhileValidTest() throws InterruptedException, ExecutionException {
// Request data from cache
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fTestCache.request(rm);
}
};
fExecutor.execute(q);
// Wait until the cache starts data retrieval.
waitForRetrieveRm();
// Complete the request
fRetrieveRm.setData(1);
fRetrieveRm.done();
q.get();
// Disable cache
fExecutor.submit(new DsfRunnable() {
public void run() {
fTestCache.reset();
}
}).get();
// Check final state
assertCacheResetWithoutData();
}
@Test
public void resetWithValueTest() throws InterruptedException, ExecutionException {
// Disable the cache with a given value
fExecutor.submit(new DsfRunnable() {
public void run() {
fTestCache.reset(2, Status.OK_STATUS);
}
}).get();
// Validate that cache is disabled without data.
Assert.assertFalse(fTestCache.isValid());
Assert.assertEquals(2, (int)fTestCache.getData());
Assert.assertTrue(fTestCache.getStatus().isOK());
}
}

View file

@ -0,0 +1,178 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.cdt.tests.dsf.concurrent;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import junit.framework.Assert;
import org.eclipse.cdt.dsf.concurrent.RequestCache;
import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor;
import org.eclipse.cdt.dsf.concurrent.DsfRunnable;
import org.eclipse.cdt.dsf.concurrent.ImmediateInDsfExecutor;
import org.eclipse.cdt.dsf.concurrent.Query;
import org.eclipse.cdt.dsf.concurrent.Transaction;
import org.eclipse.cdt.tests.dsf.TestDsfExecutor;
import org.eclipse.core.runtime.CoreException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Tests that exercise the Transaction object.
*/
public class TransactionTests {
final static private int NUM_CACHES = 5;
TestDsfExecutor fExecutor;
TestCache[] fTestCaches = new TestCache[NUM_CACHES];
DataRequestMonitor<?>[] fRetrieveRms = new DataRequestMonitor<?>[NUM_CACHES];
class TestCache extends RequestCache<Integer> {
final private int fIndex;
public TestCache(int index) {
super(new ImmediateInDsfExecutor(fExecutor));
fIndex = index;
}
@Override
protected void retrieve(DataRequestMonitor<Integer> rm) {
synchronized(TransactionTests.this) {
fRetrieveRms[fIndex] = rm;
TransactionTests.this.notifyAll();
}
}
}
class TestSingleTransaction extends Transaction<Integer> {
@Override
protected Integer process() throws InvalidCacheException, CoreException {
validate(fTestCaches[0]);
return fTestCaches[0].getData();
}
}
class TestSumTransaction extends Transaction<Integer> {
@Override
protected Integer process() throws InvalidCacheException, CoreException {
validate(fTestCaches);
int sum = 0;
for (RequestCache<Integer> cache : fTestCaches) {
sum += cache.getData();
}
return sum;
}
}
/**
* There's no rule on how quickly the cache has to start data retrieval
* after it has been requested. It could do it immediately, or it could
* wait a dispatch cycle, etc..
*/
private void waitForRetrieveRm(boolean all) {
synchronized(this) {
if (all) {
while (Arrays.asList(fRetrieveRms).contains(null)) {
try {
wait();
} catch (InterruptedException e) {
return;
}
}
} else {
while (fRetrieveRms[0] == null) {
try {
wait();
} catch (InterruptedException e) {
return;
}
}
}
}
}
@Before
public void startExecutor() throws ExecutionException, InterruptedException {
fExecutor = new TestDsfExecutor();
for (int i = 0; i < fTestCaches.length; i++) {
fTestCaches[i] = new TestCache(i);
}
}
@After
public void shutdownExecutor() throws ExecutionException, InterruptedException {
fExecutor.submit(new DsfRunnable() { public void run() {
fExecutor.shutdown();
}}).get();
if (fExecutor.exceptionsCaught()) {
Throwable[] exceptions = fExecutor.getExceptions();
throw new ExecutionException(exceptions[0]);
}
fRetrieveRms = new DataRequestMonitor<?>[NUM_CACHES];
fTestCaches = new TestCache[NUM_CACHES];
fExecutor = null;
}
@Test
public void singleTransactionTest() throws InterruptedException, ExecutionException {
final TestSingleTransaction testTransaction = new TestSingleTransaction();
// Request data from cache
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
testTransaction.request(rm);
}
};
fExecutor.execute(q);
// Wait until the cache starts data retrieval.
waitForRetrieveRm(false);
// Set the data without using an executor.
((DataRequestMonitor<Integer>)fRetrieveRms[0]).setData(1);
fRetrieveRms[0].done();
Assert.assertEquals(1, (int)q.get());
}
@Test
public void sumTransactionTest() throws InterruptedException, ExecutionException {
final TestSumTransaction testTransaction = new TestSumTransaction();
// Request data from cache
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
testTransaction.request(rm);
}
};
fExecutor.execute(q);
// Wait until the cache starts data retrieval.
waitForRetrieveRm(true);
// Set the data without using an executor.
for (DataRequestMonitor<?> rm : fRetrieveRms) {
((DataRequestMonitor<Integer>)rm).setData(1);
rm.done();
}
fExecutor.execute(q);
Assert.assertEquals(NUM_CACHES, (int)q.get());
}
}