1
0
Fork 0
mirror of https://github.com/eclipse-cdt/cdt synced 2025-06-06 17:26:01 +02:00

Bug 310345 - [concurrent] Asynchronous Cache Programming Model (ACPM)

utilities for DSF - Added an ACPM example to DSF examples plugin.
This commit is contained in:
Pawel Piech 2011-11-30 13:40:41 -08:00
parent 3bd4f3d0fc
commit 58513ccf2d
9 changed files with 293 additions and 181 deletions

View file

@ -154,6 +154,27 @@ public abstract class AbstractCache<V> implements ICache<V> {
rm.done();
}
}
private void completeWaitingRms() {
Object waiting = null;
synchronized(this) {
waiting = fWaitingList;
fWaitingList = null;
}
if (waiting != null) {
if (waiting instanceof RequestMonitor) {
completeWaitingRm((RequestMonitor)waiting);
} else if (waiting instanceof RequestMonitor[]) {
RequestMonitor[] waitingList = (RequestMonitor[])waiting;
for (int i = 0; i < waitingList.length; i++) {
if (waitingList[i] != null) {
completeWaitingRm(waitingList[i]);
}
}
}
waiting = null;
}
}
private void completeWaitingRm(RequestMonitor rm) {
rm.setStatus(fStatus);
@ -300,23 +321,32 @@ public abstract class AbstractCache<V> implements ICache<V> {
fStatus = status;
fValid = true;
Object waiting = null;
synchronized(this) {
waiting = fWaitingList;
fWaitingList = null;
}
if (waiting != null) {
if (waiting instanceof RequestMonitor) {
completeWaitingRm((RequestMonitor)waiting);
} else if (waiting instanceof RequestMonitor[]) {
RequestMonitor[] waitingList = (RequestMonitor[])waiting;
for (int i = 0; i < waitingList.length; i++) {
if (waitingList[i] != null) {
completeWaitingRm(waitingList[i]);
}
}
}
waiting = null;
}
completeWaitingRms();
}
/**
* Performs the set and reset operations in one step This allows the cache to
* remain in invalid state, but to notify any waiting listeners that the state of
* the cache has changed.
*
* @param data
* The data that should be returned to any clients waiting for
* cache data and for clients requesting data until the cache is
* invalidated.
* @status The status that should be returned to any clients waiting for
* cache data and for clients requesting data until the cache is
* invalidated
*
* @see #reset(Object, IStatus)
*/
protected void setAndReset(V data, IStatus status) {
assert fExecutor.getDsfExecutor().isInExecutorThread();
fData = data;
fStatus = status;
fValid = false;
completeWaitingRms();
}
}

View file

@ -107,8 +107,7 @@ abstract public class RangeCache<V> {
protected List<V> process() throws InvalidCacheException, CoreException {
clearCanceledRequests();
List<ICache<?>> transactionRequests = getRequests(fOffset, fCount);
List<Request> transactionRequests = getRequests(fOffset, fCount);
validate(transactionRequests);
return makeElementsListFromRequests(transactionRequests, fOffset, fCount);
@ -156,7 +155,7 @@ abstract public class RangeCache<V> {
public ICache<List<V>> getRange(final long offset, final int count) {
assert fExecutor.getDsfExecutor().isInExecutorThread();
List<ICache<?>> requests = getRequests(offset, count);
List<Request> requests = getRequests(offset, count);
RequestCache<List<V>> range = new RequestCache<List<V>>(fExecutor) {
@Override
@ -232,8 +231,8 @@ abstract public class RangeCache<V> {
}
}
private List<ICache<?>> getRequests(long fOffset, int fCount) {
List<ICache<?>> requests = new ArrayList<ICache<?>>(1);
private List<Request> getRequests(long fOffset, int fCount) {
List<Request> requests = new ArrayList<Request>(1);
// Create a new request for the data to retrieve.
Request current = new Request(fOffset, fCount);
@ -252,7 +251,7 @@ abstract public class RangeCache<V> {
// Adjust the beginning of the requested range of data. If there
// is already an overlapping range in front of the requested range,
// then use it.
private Request adjustRequestHead(Request request, List<ICache<?>> transactionRequests, long offset, int count) {
private Request adjustRequestHead(Request request, List<Request> transactionRequests, long offset, int count) {
SortedSet<Request> headRequests = fRequests.headSet(request);
if (!headRequests.isEmpty()) {
Request headRequest = headRequests.last();
@ -276,7 +275,7 @@ abstract public class RangeCache<V> {
* @param transactionRequests
* @return
*/
private Request adjustRequestTail(Request current, List<ICache<?>> transactionRequests, long offset, int count) {
private Request adjustRequestTail(Request current, List<Request> transactionRequests, long offset, int count) {
// Create a duplicate of the tailSet, in order to avoid a concurrent modification exception.
List<Request> tailSet = new ArrayList<Request>(fRequests.tailSet(current));
@ -313,14 +312,13 @@ abstract public class RangeCache<V> {
return current;
}
private List<V> makeElementsListFromRequests(List<ICache<?>> requests, long offset, int count) {
private List<V> makeElementsListFromRequests(List<Request> requests, long offset, int count) {
List<V> retVal = new ArrayList<V>(count);
long index = offset;
long end = offset + count;
int requestIdx = 0;
while (index < end ) {
@SuppressWarnings("unchecked")
Request request = (Request)requests.get(requestIdx);
Request request = requests.get(requestIdx);
if (index < request.fOffset + request.fCount) {
retVal.add( request.getData().get((int)(index - request.fOffset)) );
index ++;

View file

@ -94,4 +94,13 @@ public abstract class RequestCache<V> extends AbstractCache<V> {
}
super.set(data, status);
}
@Override
protected void reset() {
if (fRm != null) {
fRm.cancel();
fRm = null;
}
super.reset();
}
}

View file

@ -70,8 +70,10 @@ public abstract class Transaction<V> {
* logic once the cache object has been updated from the source.
*
* @return the cached data if it's valid, otherwise an exception is thrown
* @throws InvalidCacheException
* @throws CoreException
* @throws Transaction.InvalidCacheException Exception indicating that a
* cache is not valid and transaction will need to be rescheduled.
* @throws CoreException Exception indicating that one of the caches is
* in error state and transaction cannot be processed.
*/
abstract protected V process() throws InvalidCacheException, CoreException;
@ -149,19 +151,20 @@ public abstract class Transaction<V> {
* See {@link #validate(RequestCache)}. This variant simply validates
* multiple cache objects.
*/
public void validate(ICache<?> ... caches) throws InvalidCacheException, CoreException {
public <T> void validate(ICache<?> ... caches) throws InvalidCacheException, CoreException {
validate(Arrays.asList(caches));
}
/**
* See {@link #validate(RequestCache)}. This variant simply validates
* multiple cache objects.
*/
public void validate(Iterable<ICache<?>> caches) throws InvalidCacheException, CoreException {
/**
* See {@link #validate(RequestCache)}. This variant simply validates
* multiple cache objects.
*/
public void validate(@SuppressWarnings("rawtypes") Iterable caches) throws InvalidCacheException, CoreException {
// Check if any of the caches have errors:
boolean allValid = true;
for (ICache<?> cache : caches) {
for (Object cacheObj : caches) {
ICache<?> cache = (ICache<?>)cacheObj;
if (cache.isValid()) {
if (!cache.getStatus().isOK()) {
throw new CoreException(cache.getStatus());
@ -171,9 +174,9 @@ public abstract class Transaction<V> {
}
}
if (!allValid) {
// Throw the invalid cache exception, but first schedule a
// re-attempt of the transaction logic, to occur when the
// stale/unset cache objects have been updated
// Throw the invalid cache exception, but first schedule a
// re-attempt of the transaction logic, to occur when the
// stale/unset cache objects have been updated
CountingRequestMonitor countringRm = new CountingRequestMonitor(ImmediateExecutor.getInstance(), fRm) {
@Override
protected void handleCompleted() {
@ -181,7 +184,8 @@ public abstract class Transaction<V> {
}
};
int count = 0;
for (ICache<?> cache : caches) {
for (Object cacheObj : caches) {
ICache<?> cache = (ICache<?>)cacheObj;
if (!cache.isValid()) {
cache.update(countringRm);
count++;

View file

@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2006, 2009 Wind River Systems and others.
* Copyright (c) 2006, 2011 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@ -66,13 +66,15 @@ public class AsyncDataViewer
final private IDataGenerator fDataGenerator;
// Fields used in request cancellation logic.
private List<ValueDataRequestMonitor> fItemDataRequestMonitors = new LinkedList<ValueDataRequestMonitor>();
private List<ValueDataRequestMonitor> fItemDataRequestMonitors =
new LinkedList<ValueDataRequestMonitor>();
private Set<Integer> fIndexesToCancel = new HashSet<Integer>();
private int fCancelCallsPending = 0;
public AsyncDataViewer(TableViewer viewer, IDataGenerator generator) {
fViewer = viewer;
fDisplayExecutor = DisplayDsfExecutor.getDisplayDsfExecutor(fViewer.getTable().getDisplay());
fDisplayExecutor = DisplayDsfExecutor.getDisplayDsfExecutor(
fViewer.getTable().getDisplay());
fDataGenerator = generator;
fDataGenerator.addListener(this);
}
@ -104,10 +106,18 @@ public class AsyncDataViewer
1, TimeUnit.MILLISECONDS);
}
/**
* Calculates the number of visible items based on the top item index and
* table bounds.
* @param top Index of top item.
* @return calculated number of items in viewer
*/
private int getVisibleItemCount(int top) {
Table table = fViewer.getTable();
int itemCount = table.getItemCount();
return Math.min((table.getBounds().height / table.getItemHeight()) + 2, itemCount - top);
return Math.min(
(table.getBounds().height / table.getItemHeight()) + 2,
itemCount - top);
}
@ThreadSafe
@ -131,7 +141,10 @@ public class AsyncDataViewer
}});
}
/**
* Retrieve the up to date count. When a new count is set to viewer, the
* viewer will refresh all items as well.
*/
private void queryItemCount() {
// Request count from data provider. When the count is returned, we
// have to re-dispatch into the display thread to avoid calling
@ -150,13 +163,25 @@ public class AsyncDataViewer
}
}
});
}
// Dedicated class for data item requests. This class holds the index
// argument so it can be examined when canceling stale requests.
private class ValueDataRequestMonitor extends DataRequestMonitor<String> {
/**
* Retrieves value of an element at given index. When complete the value
* is written to the viewer.
* @param index Index of value to retrieve.
*/
private void queryValue(final int index) {
ValueDataRequestMonitor rm = new ValueDataRequestMonitor(index);
fItemDataRequestMonitors.add(rm);
fDataGenerator.getValue(index, rm);
}
/**
* Dedicated class for data item requests. This class holds the index
* argument so it can be examined when canceling stale requests.
*/
private class ValueDataRequestMonitor extends DataRequestMonitor<Integer> {
/** Index is used when canceling stale requests. */
int fIndex;
@ -170,7 +195,8 @@ public class AsyncDataViewer
protected void handleCompleted() {
fItemDataRequestMonitors.remove(this);
// Check if the request completed successfully, otherwise ignore it.
// Check if the request completed successfully, otherwise ignore
// it.
if (isSuccess()) {
if (!fViewer.getTable().isDisposed()) {
fViewer.replace(getData(), fIndex);
@ -178,12 +204,6 @@ public class AsyncDataViewer
}
}
}
private void queryValue(final int index) {
ValueDataRequestMonitor rm = new ValueDataRequestMonitor(index);
fItemDataRequestMonitors.add(rm);
fDataGenerator.getValue(index, rm);
}
private void cancelStaleRequests(int topIdx, int botIdx) {
// Decrement the count of outstanding cancel calls.
@ -194,7 +214,10 @@ public class AsyncDataViewer
// Go through the outstanding requests and cancel any that
// are not visible anymore.
for (Iterator<ValueDataRequestMonitor> itr = fItemDataRequestMonitors.iterator(); itr.hasNext();) {
for (Iterator<ValueDataRequestMonitor> itr =
fItemDataRequestMonitors.iterator();
itr.hasNext();)
{
ValueDataRequestMonitor item = itr.next();
if (item.fIndex < topIdx || item.fIndex > botIdx) {
// Set the item to canceled status, so that the data provider
@ -237,14 +260,16 @@ public class AsyncDataViewer
Font font = new Font(display, "Courier", 10, SWT.NORMAL);
// Create the table viewer.
TableViewer tableViewer = new TableViewer(shell, SWT.BORDER | SWT.VIRTUAL);
TableViewer tableViewer =
new TableViewer(shell, SWT.BORDER | SWT.VIRTUAL);
tableViewer.getControl().setLayoutData(data);
// Create the data generator.
final IDataGenerator generator = new DataGeneratorWithExecutor();
// Create the content provider which will populate the viewer.
AsyncDataViewer contentProvider = new AsyncDataViewer(tableViewer, generator);
AsyncDataViewer contentProvider =
new AsyncDataViewer(tableViewer, generator);
tableViewer.setContentProvider(contentProvider);
tableViewer.setInput(new Object());

View file

@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2006, 2009 Wind River Systems and others.
* Copyright (c) 2006, 2011 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@ -14,14 +14,14 @@ package org.eclipse.cdt.examples.dsf.dataviewer;
//#package org.eclipse.cdt.examples.dsf.dataviewer.answers;
//#endif
import java.util.HashSet;
import java.util.HashMap;
//#ifdef answers
//#import java.util.Iterator;
//#endif
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@ -70,6 +70,16 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
Request(RequestMonitor rm) {
fRequestMonitor = rm;
rm.addCancelListener(new RequestMonitor.ICanceledListener() {
public void requestCanceled(RequestMonitor rm) {
fExecutor.execute(new DsfRunnable() {
public void run() {
fQueue.remove(Request.this);
}
});
}
});
}
}
@ -93,7 +103,7 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
//#endif
class ItemRequest extends Request {
final int fIndex;
ItemRequest(int index, DataRequestMonitor<String> rm) {
ItemRequest(int index, DataRequestMonitor<Integer> rm) {
super(rm);
fIndex = index;
}
@ -156,24 +166,20 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private Set<Integer> fChangedIndexes = new HashSet<Integer>();
private Map<Integer, Integer> fChangedValues =
new HashMap<Integer, Integer>();
// Flag used to ensure that requests are processed sequentially.
//#ifdef exercises
// TODO Exercise 4 - Add an annotation (ThreadSafe/ConfinedToDsfExecutor)
// indicating allowed thread access to this class/method/member
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private boolean fServiceQueueInProgress = false;
//#ifdef exercises
// TODO Exercise 4 - Add an annotation (ThreadSafe/ConfinedToDsfExecutor)
// indicating allowed thread access to this class/method/member
//#endif
public DataGeneratorWithExecutor() {
// Create the executor
fExecutor = new DefaultDsfExecutor("Supplier Executor");
this(new DefaultDsfExecutor("Supplier Executor"));
}
//#ifdef exercises
// TODO Exercise 4 - Add an annotation (ThreadSafe/ConfinedToDsfExecutor)
// indicating allowed thread access to this class/method/member
//#endif
public DataGeneratorWithExecutor(DsfExecutor executor) {
// Create the executor
fExecutor = executor;
// Schedule a runnable to make the random changes.
fExecutor.scheduleAtFixedRate(
@ -182,8 +188,8 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
randomChanges();
}
},
RANDOM_CHANGE_INTERVAL,
RANDOM_CHANGE_INTERVAL,
new Random().nextInt() % RANDOM_CHANGE_INTERVAL,
RANDOM_CHANGE_INTERVAL, //Add a 10% variance to the interval.
TimeUnit.MILLISECONDS);
}
@ -197,8 +203,9 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
public void run() {
// Empty the queue of requests and fail them.
for (Request request : fQueue) {
request.fRequestMonitor.setStatus(
new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
request.fRequestMonitor.setStatus(new Status(
IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
"Supplier shut down"));
request.fRequestMonitor.done();
}
fQueue.clear();
@ -209,7 +216,8 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
}
});
} catch (RejectedExecutionException e) {
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
"Supplier shut down"));
rm.done();
}
}
@ -227,7 +235,9 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
}
});
} catch (RejectedExecutionException e) {
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
rm.setStatus(new Status(
IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
"Supplier shut down"));
rm.done();
}
}
@ -236,7 +246,7 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
// TODO Exercise 4 - Add an annotation (ThreadSafe/ConfinedToDsfExecutor)
// indicating allowed thread access to this class/method/member
//#endif
public void getValue(final int index, final DataRequestMonitor<String> rm) {
public void getValue(final int index, final DataRequestMonitor<Integer> rm) {
try {
fExecutor.execute( new DsfRunnable() {
public void run() {
@ -245,7 +255,8 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
}
});
} catch (RejectedExecutionException e) {
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
"Supplier shut down"));
rm.done();
}
}
@ -286,7 +297,16 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private void serviceQueue() {
fExecutor.schedule(
new DsfRunnable() {
public void run() {
doServiceQueue();
}
},
PROCESSING_DELAY, TimeUnit.MILLISECONDS);
}
private void doServiceQueue() {
//#ifdef exercises
// TODO Exercise 3 - Add logic to discard cancelled requests from queue.
// Hint: Since serviceQueue() is called using the executor, and the
@ -305,33 +325,16 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
//# }
//#endif
// If a queue servicing is already scheduled, do nothing.
if (fServiceQueueInProgress) {
return;
}
if (fQueue.size() != 0) {
while (fQueue.size() != 0) {
// If there are requests to service, remove one from the queue and
// schedule a runnable to process the request after a processing
// delay.
fServiceQueueInProgress = true;
final Request request = fQueue.remove(0);
fExecutor.schedule(
new DsfRunnable() {
public void run() {
if (request instanceof CountRequest) {
processCountRequest((CountRequest)request);
} else if (request instanceof ItemRequest) {
processItemRequest((ItemRequest)request);
}
// Reset the processing flag and process next
// request.
fServiceQueueInProgress = false;
serviceQueue();
}
},
PROCESSING_DELAY, TimeUnit.MILLISECONDS);
Request request = fQueue.remove(0);
if (request instanceof CountRequest) {
processCountRequest((CountRequest)request);
} else if (request instanceof ItemRequest) {
processItemRequest((ItemRequest)request);
}
}
}
@ -343,7 +346,8 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
//#endif
private void processCountRequest(CountRequest request) {
@SuppressWarnings("unchecked") // Suppress warning about lost type info.
DataRequestMonitor<Integer> rm = (DataRequestMonitor<Integer>)request.fRequestMonitor;
DataRequestMonitor<Integer> rm =
(DataRequestMonitor<Integer>)request.fRequestMonitor;
rm.setData(fCount);
rm.done();
@ -357,12 +361,13 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
//#endif
private void processItemRequest(ItemRequest request) {
@SuppressWarnings("unchecked") // Suppress warning about lost type info.
DataRequestMonitor<String> rm = (DataRequestMonitor<String>)request.fRequestMonitor;
DataRequestMonitor<Integer> rm =
(DataRequestMonitor<Integer>)request.fRequestMonitor;
if (fChangedIndexes.contains(request.fIndex)) {
rm.setData("Changed: " + request.fIndex);
if (fChangedValues.containsKey(request.fIndex)) {
rm.setData(fChangedValues.get(request.fIndex));
} else {
rm.setData(Integer.toString(request.fIndex));
rm.setData(request.fIndex);
}
rm.done();
}
@ -398,10 +403,11 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
private void randomCountReset() {
// Calculate the new count.
Random random = new java.util.Random();
fCount = MIN_COUNT + Math.abs(random.nextInt()) % (MAX_COUNT - MIN_COUNT);
fCount = MIN_COUNT +
Math.abs(random.nextInt()) % (MAX_COUNT - MIN_COUNT);
// Reset the changed values.
fChangedIndexes.clear();
fChangedValues.clear();
// Notify listeners
for (Listener listener : fListeners) {
@ -421,17 +427,19 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
private void randomDataChange() {
// Calculate the indexes to change.
Random random = new java.util.Random();
Set<Integer> set = new HashSet<Integer>();
Map<Integer, Integer> changed = new HashMap<Integer, Integer>();
for (int i = 0; i < fCount * RANDOM_CHANGE_SET_PERCENTAGE / 100; i++) {
set.add( new Integer(Math.abs(random.nextInt()) % fCount) );
}
int randomIndex = Math.abs(random.nextInt()) % fCount;
int randomValue = Math.abs(random.nextInt()) % fCount;
changed.put(randomIndex, randomValue);
}
// Add the indexes to an overall set of changed indexes.
fChangedIndexes.addAll(set);
fChangedValues.putAll(changed);
// Notify listeners
for (Listener listener : fListeners) {
listener.valuesChanged(set);
for (Object listener : fListeners) {
((Listener)listener).valuesChanged(changed.keySet());
}
}
}

View file

@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2006, 2009 Wind River Systems and others.
* Copyright (c) 2006, 2011 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@ -15,9 +15,9 @@ package org.eclipse.cdt.examples.dsf.dataviewer;
//#endif
import java.util.Collections;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Random;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -41,7 +41,9 @@ import org.eclipse.cdt.examples.dsf.DsfExamplesPlugin;
* synchronization.
* </p>
*/
public class DataGeneratorWithThread extends Thread implements IDataGenerator {
public class DataGeneratorWithThread extends Thread
implements IDataGenerator
{
// Request objects are used to serialize the interface calls into objects
// which can then be pushed into a queue.
@ -61,7 +63,7 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
class ItemRequest extends Request {
final int fIndex;
ItemRequest(int index, DataRequestMonitor<String> rm) {
ItemRequest(int index, DataRequestMonitor<Integer> rm) {
super(rm);
fIndex = index;
}
@ -76,7 +78,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
// Main request queue of the data generator. The getValue(), getCount(),
// and shutdown() methods write into the queue, while the run() method
// reads from it.
private final BlockingQueue<Request> fQueue = new LinkedBlockingQueue<Request>();
private final BlockingQueue<Request> fQueue =
new LinkedBlockingQueue<Request>();
// ListenerList class provides thread safety.
private ListenerList fListeners = new ListenerList();
@ -88,7 +91,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
private int fCountResetTrigger = 0;
// Elements which were modified since the last reset.
private Set<Integer> fChangedIndexes = Collections.synchronizedSet(new HashSet<Integer>());
private Map<Integer, Integer> fChangedValues =
Collections.synchronizedMap(new HashMap<Integer, Integer>());
// Used to determine when to make changes in data.
private long fLastChangeTime = System.currentTimeMillis();
@ -108,7 +112,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
fQueue.add(new ShutdownRequest(rm));
} else {
//
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
"Supplier shut down"));
rm.done();
}
}
@ -117,16 +122,18 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
if (!fShutdown.get()) {
fQueue.add(new CountRequest(rm));
} else {
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
"Supplier shut down"));
rm.done();
}
}
public void getValue(int index, DataRequestMonitor<String> rm) {
public void getValue(int index, DataRequestMonitor<Integer> rm) {
if (!fShutdown.get()) {
fQueue.add(new ItemRequest(index, rm));
} else {
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
"Supplier shut down"));
rm.done();
}
}
@ -150,7 +157,6 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
// If a request was dequeued, process it.
if (request != null) {
// Simulate a processing delay.
Thread.sleep(PROCESSING_DELAY);
if (request instanceof CountRequest) {
processCountRequest((CountRequest)request);
@ -162,6 +168,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
request.fRequestMonitor.done();
break;
}
} else {
Thread.sleep(PROCESSING_DELAY);
}
// Simulate data changes.
@ -173,7 +181,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
private void processCountRequest(CountRequest request) {
@SuppressWarnings("unchecked") // Suppress warning about lost type info.
DataRequestMonitor<Integer> rm = (DataRequestMonitor<Integer>)request.fRequestMonitor;
DataRequestMonitor<Integer> rm =
(DataRequestMonitor<Integer>)request.fRequestMonitor;
rm.setData(fCount);
rm.done();
@ -181,12 +190,13 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
private void processItemRequest(ItemRequest request) {
@SuppressWarnings("unchecked") // Suppress warning about lost type info.
DataRequestMonitor<String> rm = (DataRequestMonitor<String>)request.fRequestMonitor;
DataRequestMonitor<Integer> rm =
(DataRequestMonitor<Integer>)request.fRequestMonitor;
if (fChangedIndexes.contains(request.fIndex)) {
rm.setData("Changed: " + request.fIndex);
if (fChangedValues.containsKey(request.fIndex)) {
rm.setData(fChangedValues.get(request.fIndex));
} else {
rm.setData(Integer.toString(request.fIndex));
rm.setData(request.fIndex);
}
rm.done();
}
@ -194,12 +204,14 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
private void randomChanges() {
// Check if enough time is elapsed.
if (System.currentTimeMillis() > fLastChangeTime + RANDOM_CHANGE_INTERVAL) {
if (System.currentTimeMillis() >
fLastChangeTime + RANDOM_CHANGE_INTERVAL)
{
fLastChangeTime = System.currentTimeMillis();
// Once every number of changes, reset the count, the rest of the
// times just change certain values.
if (++fCountResetTrigger % RANDOM_COUNT_CHANGE_INTERVALS == 0){
if (++fCountResetTrigger % RANDOM_COUNT_CHANGE_INTERVALS == 0) {
randomCountReset();
} else {
randomDataChange();
@ -213,7 +225,7 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
fCount = MIN_COUNT + Math.abs(random.nextInt()) % (MAX_COUNT - MIN_COUNT);
// Reset the changed values.
fChangedIndexes.clear();
fChangedValues.clear();
// Notify listeners
for (Object listener : fListeners.getListeners()) {
@ -224,17 +236,19 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
private void randomDataChange() {
// Calculate the indexes to change.
Random random = new java.util.Random();
Set<Integer> set = new HashSet<Integer>();
Map<Integer, Integer> changed = new HashMap<Integer, Integer>();
for (int i = 0; i < fCount * RANDOM_CHANGE_SET_PERCENTAGE / 100; i++) {
set.add( new Integer(Math.abs(random.nextInt()) % fCount) );
int randomIndex = Math.abs(random.nextInt()) % fCount;
int randomValue = Math.abs(random.nextInt()) % fCount;
changed.put(randomIndex, randomValue);
}
// Add the indexes to an overall set of changed indexes.
fChangedIndexes.addAll(set);
fChangedValues.putAll(changed);
// Notify listeners
for (Object listener : fListeners.getListeners()) {
((Listener)listener).valuesChanged(set);
((Listener)listener).valuesChanged(changed.keySet());
}
}
}

View file

@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2006, 2009 Wind River Systems and others.
* Copyright (c) 2006, 2011 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@ -41,11 +41,11 @@ public interface IDataGenerator {
// Changing the count range can stress the scalability of the system, while
// changing of the process delay and random change interval can stress
// its performance.
final static int MIN_COUNT = 100;
final static int MAX_COUNT = 200;
final static int PROCESSING_DELAY = 10;
final static int RANDOM_CHANGE_INTERVAL = 10000;
final static int RANDOM_COUNT_CHANGE_INTERVALS = 3;
final static int MIN_COUNT = 50;
final static int MAX_COUNT = 100;
final static int PROCESSING_DELAY = 500;
final static int RANDOM_CHANGE_INTERVAL = 4000;
final static int RANDOM_COUNT_CHANGE_INTERVALS = 5;
final static int RANDOM_CHANGE_SET_PERCENTAGE = 10;
@ -58,7 +58,7 @@ public interface IDataGenerator {
// Data access methods.
void getCount(DataRequestMonitor<Integer> rm);
void getValue(int index, DataRequestMonitor<String> rm);
void getValue(int index, DataRequestMonitor<Integer> rm);
// Method used to shutdown the data generator including any threads that
// it may use.

View file

@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2008, 2009 Wind River Systems and others.
* Copyright (c) 2008, 2011 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@ -14,8 +14,11 @@ package org.eclipse.cdt.examples.dsf.dataviewer;
//#package org.eclipse.cdt.examples.dsf.dataviewer.answers;
//#endif
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.eclipse.cdt.dsf.concurrent.CountingRequestMonitor;
import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor;
import org.eclipse.cdt.dsf.concurrent.ImmediateExecutor;
import org.eclipse.cdt.dsf.concurrent.Query;
@ -35,8 +38,8 @@ import org.eclipse.swt.widgets.Shell;
* This viewer implements the {@link IStructuredContentProvider} interface
* which is used by the JFace TableViewer class to populate a Table. This
* interface contains one principal methods for reading data {@link #getElements(Object)},
* which synchronously returns an array of elements. In order to implement this
* method using the asynchronous data generator, this provider uses the
* which synchronously returns an array of elements. In order to implement
* this method using the asynchronous data generator, this provider uses the
* {@link Query} object.
* </p>
*/
@ -84,27 +87,43 @@ public class SyncDataViewer
return new Object[0];
}
// Create the array that will be filled with elements.
// For each index in the array execute a query to get the element at
// that index.
final Object[] elements = new Object[count];
for (int i = 0; i < count; i++) {
final int index = i;
Query<String> valueQuery = new Query<String>() {
@Override
protected void execute(DataRequestMonitor<String> rm) {
fDataGenerator.getValue(index, rm);
final int finalCount = count;
Query<List<Integer>> valueQuery = new Query<List<Integer>>() {
@Override
protected void execute(final DataRequestMonitor<List<Integer>> rm) {
final Integer[] retVal = new Integer[finalCount];
final CountingRequestMonitor crm = new CountingRequestMonitor(
ImmediateExecutor.getInstance(), rm)
{
@Override
protected void handleSuccess() {
rm.setData(Arrays.asList(retVal));
rm.done();
};
};
for (int i = 0; i < finalCount; i++) {
final int finalI = i;
fDataGenerator.getValue(
i,
new DataRequestMonitor<Integer>(
ImmediateExecutor.getInstance(), crm)
{
@Override
protected void handleSuccess() {
retVal[finalI] = getData();
crm.done();
}
});
}
};
ImmediateExecutor.getInstance().execute(valueQuery);
try {
elements[i] = valueQuery.get();
} catch (Exception e) {
elements[i] = "error";
}
crm.setDoneCount(finalCount);
}
};
ImmediateExecutor.getInstance().execute(valueQuery);
try {
return valueQuery.get().toArray(new Integer[0]);
} catch (Exception e) {
}
return elements;
return new Object[0];
}
public void dispose() {
@ -140,6 +159,10 @@ public class SyncDataViewer
});
}
/**
* The entry point for the example.
* @param args Program arguments.
*/
public static void main(String[] args) {
// Create the shell to hold the viewer.
Display display = new Display();
@ -162,7 +185,8 @@ public class SyncDataViewer
//#endif
// Create the content provider which will populate the viewer.
SyncDataViewer contentProvider = new SyncDataViewer(tableViewer, generator);
SyncDataViewer contentProvider =
new SyncDataViewer(tableViewer, generator);
tableViewer.setContentProvider(contentProvider);
tableViewer.setInput(new Object());