From 02c9e05cd577223b8f7a6320cf600c38b7ae8207 Mon Sep 17 00:00:00 2001 From: Pawel Piech Date: Thu, 1 Dec 2011 07:27:13 -0800 Subject: [PATCH] Bug 310345 - [concurrent] Asynchronous Cache Programming Model (ACPM) - Added missing files from last commit. --- .../dsf/dataviewer/ACPMSumDataGenerator.java | 234 +++++++++ .../dsf/dataviewer/ACPMSumDataViewer.java | 482 ++++++++++++++++++ .../dsf/dataviewer/AsyncSumDataGenerator.java | 171 +++++++ .../dsf/dataviewer/AsyncSumDataViewer.java | 410 +++++++++++++++ .../dataviewer/DataGeneratorCacheManager.java | 165 ++++++ 5 files changed, 1462 insertions(+) create mode 100644 dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/ACPMSumDataGenerator.java create mode 100644 dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/ACPMSumDataViewer.java create mode 100644 dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncSumDataGenerator.java create mode 100644 dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncSumDataViewer.java create mode 100644 dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorCacheManager.java diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/ACPMSumDataGenerator.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/ACPMSumDataGenerator.java new file mode 100644 index 00000000000..889d59475dc --- /dev/null +++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/ACPMSumDataGenerator.java @@ -0,0 +1,234 @@ +/******************************************************************************* + * Copyright (c) 2011 Wind River Systems and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +//#ifdef exercises +package org.eclipse.cdt.examples.dsf.dataviewer; +//#else +//#package org.eclipse.cdt.examples.dsf.dataviewer.answers; +//#endif + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor; +import org.eclipse.cdt.dsf.concurrent.DsfExecutor; +import org.eclipse.cdt.dsf.concurrent.DsfRunnable; +import org.eclipse.cdt.dsf.concurrent.ICache; +import org.eclipse.cdt.dsf.concurrent.ImmediateInDsfExecutor; +import org.eclipse.cdt.dsf.concurrent.RequestMonitor; +import org.eclipse.cdt.dsf.concurrent.Transaction; +import org.eclipse.core.runtime.CoreException; + +/** + * A data generator which performs a sum computation on data retrieved from a + * number of other data generators. The data retrieval from other generators + * is performed using ACPM caches and the result is calculated once all caches + * are valid. + *

+ * Unlike {@link AsyncSumDataGenerator}, this data generator listens to events + * from the individual the data providers. Theve events are used to + * invalidate caches to make sure that they don't return incorrect data. This + * generator also sends out events to its clients to notify them to update, or + * invalidate their caches. + *

+ */ +public class ACPMSumDataGenerator + implements IDataGenerator, IDataGenerator.Listener +{ + + /** + * DSF executor used to serialize data access within this data generator. + */ + final private DsfExecutor fExecutor; + + /** + * Data generators to retrieve original data to perform calculations on. + * The generators are accessed through the cache manager wrappers. + */ + final private DataGeneratorCacheManager[] fDataGeneratorCMs; + + /** + * List of listeners for this data generator. + */ + final private List fListeners = new LinkedList(); + + public ACPMSumDataGenerator(DsfExecutor executor, + IDataGenerator[] generators) + { + fExecutor = executor; + + // Create wrappers for data generators and add ourselves as listener + // to their events. + fDataGeneratorCMs = new DataGeneratorCacheManager[generators.length]; + ImmediateInDsfExecutor immediateExecutor = + new ImmediateInDsfExecutor(fExecutor); + for (int i = 0; i < generators.length; i++) { + fDataGeneratorCMs[i] = new DataGeneratorCacheManager( + immediateExecutor, generators[i]); + generators[i].addListener(this); + } + } + + public void getCount(final DataRequestMonitor rm) { + // Artificially delay the retrieval of the sum data to simulate + // real processing time. + fExecutor.schedule( new Runnable() { + public void run() { + // Create the transaction here to put all the ugly + // code in one place. + new Transaction() { + @Override + protected Integer process() + throws Transaction.InvalidCacheException, + CoreException + { + return processCount(this); + } + }.request(rm); + } + }, + PROCESSING_DELAY, TimeUnit.MILLISECONDS); + } + + /** + * Perform the calculation to get the max count for the given transaction. + * @param transaction The ACPM transaction to use for calculation. + * @return Calculated count. + * @throws Transaction.InvalidCacheException {@link Transaction#process} + * @throws CoreException See {@link Transaction#process} + */ + private Integer processCount(Transaction transaction) + throws Transaction.InvalidCacheException, CoreException + { + // Assemble all needed count caches into a collection. + List> countCaches = + new ArrayList>(fDataGeneratorCMs.length); + for (DataGeneratorCacheManager dataGeneratorCM : fDataGeneratorCMs) { + countCaches.add(dataGeneratorCM.getCount()); + } + // Validate all count caches at once. This executes needed requests + // in parallel. + transaction.validate(countCaches); + + // Calculate the max value and return. + int maxCount = 0; + for (ICache countCache : countCaches) { + maxCount = Math.max(maxCount, countCache.getData()); + } + return maxCount; + } + + public void getValue(final int index, final DataRequestMonitor rm) + { + // Add a processing delay. + fExecutor.schedule( new Runnable() { + public void run() { + new Transaction() { + @Override + protected Integer process() + throws Transaction.InvalidCacheException, + CoreException + { + return processValue(this, index); + } + }.request(rm); + } + }, + PROCESSING_DELAY, TimeUnit.MILLISECONDS); + } + + /** + * Perform the calculation to get the sum of values at given index. + * @param transaction The ACPM transaction to use for calculation. + * @param index Index of value to calculate. + * @return Calculated value. + * @throws Transaction.InvalidCacheException {@link Transaction#process} + * @throws CoreException See {@link Transaction#process} + */ + private Integer processValue(Transaction transaction, int index) + throws Transaction.InvalidCacheException, CoreException + { + List> valueCaches = + new ArrayList>(fDataGeneratorCMs.length); + for (DataGeneratorCacheManager dataGeneratorCM : fDataGeneratorCMs) { + valueCaches.add(dataGeneratorCM.getValue(index)); + } + // Validate all value caches at once. This executes needed requests + // in parallel. + transaction.validate(valueCaches); + + int sum = 0; + for (ICache valueCache : valueCaches) { + sum += valueCache.getData(); + } + return sum; + } + + public void shutdown(final RequestMonitor rm) { + for (DataGeneratorCacheManager dataGeneratorCM : fDataGeneratorCMs) { + dataGeneratorCM.getDataGenerator().removeListener(this); + dataGeneratorCM.dispose(); + rm.done(); + } + rm.done(); + } + + public void addListener(final Listener listener) { + // Must access fListeners on executor thread. + try { + fExecutor.execute( new DsfRunnable() { + public void run() { + fListeners.add(listener); + } + }); + } catch (RejectedExecutionException e) {} + } + + public void removeListener(final Listener listener) { + // Must access fListeners on executor thread. + try { + fExecutor.execute( new DsfRunnable() { + public void run() { + fListeners.remove(listener); + } + }); + } catch (RejectedExecutionException e) {} + } + + public void countChanged() { + // Must access fListeners on executor thread. + try { + fExecutor.execute( new DsfRunnable() { + public void run() { + for (Listener listener : fListeners) { + listener.countChanged(); + } + } + }); + } catch (RejectedExecutionException e) {} + } + + public void valuesChanged(final Set changed) { + // Must access fListeners on executor thread. + try { + fExecutor.execute( new DsfRunnable() { + public void run() { + for (Object listener : fListeners) { + ((Listener)listener).valuesChanged(changed); + } + } + }); + } catch (RejectedExecutionException e) {} + } +} diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/ACPMSumDataViewer.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/ACPMSumDataViewer.java new file mode 100644 index 00000000000..fd526e74f72 --- /dev/null +++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/ACPMSumDataViewer.java @@ -0,0 +1,482 @@ +/******************************************************************************* + * Copyright (c) 2011 Wind River Systems and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +//#ifdef exercises +package org.eclipse.cdt.examples.dsf.dataviewer; +//#else +//#package org.eclipse.cdt.examples.dsf.dataviewer.answers; +//#endif + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.cdt.dsf.concurrent.ConfinedToDsfExecutor; +import org.eclipse.cdt.dsf.concurrent.CountingRequestMonitor; +import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor; +import org.eclipse.cdt.dsf.concurrent.DefaultDsfExecutor; +import org.eclipse.cdt.dsf.concurrent.DsfExecutor; +import org.eclipse.cdt.dsf.concurrent.ICache; +import org.eclipse.cdt.dsf.concurrent.ImmediateExecutor; +import org.eclipse.cdt.dsf.concurrent.ImmediateInDsfExecutor; +import org.eclipse.cdt.dsf.concurrent.Query; +import org.eclipse.cdt.dsf.concurrent.ThreadSafe; +import org.eclipse.cdt.dsf.concurrent.Transaction; +import org.eclipse.cdt.dsf.ui.concurrent.DisplayDsfExecutor; +import org.eclipse.core.runtime.CoreException; +import org.eclipse.jface.viewers.ILazyContentProvider; +import org.eclipse.jface.viewers.TableViewer; +import org.eclipse.jface.viewers.Viewer; +import org.eclipse.swt.SWT; +import org.eclipse.swt.graphics.Font; +import org.eclipse.swt.layout.GridData; +import org.eclipse.swt.layout.GridLayout; +import org.eclipse.swt.widgets.Display; +import org.eclipse.swt.widgets.Shell; +import org.eclipse.swt.widgets.Table; + +/** + * Data viewer based on a table, which reads data from multiple data + * providers using ACPM methods and performs a computation on the + * retrieved data. + *

+ * This example builds on the {@link AsyncSumDataViewer} example. It + * demonstrates using ACPM to solve the data consistency problem when + * retrieving data from multiple sources asynchronously. + *

+ */ +@ConfinedToDsfExecutor("fDisplayExecutor") +public class ACPMSumDataViewer implements ILazyContentProvider +{ + /** View update frequency interval. */ + final private static int UPDATE_INTERVAL = 10000; + + /** Executor to use instead of Display.asyncExec(). **/ + @ThreadSafe + final private DsfExecutor fDisplayExecutor; + + /** Executor to use when retrieving data from data providers */ + @ThreadSafe + final private ImmediateInDsfExecutor fDataExecutor; + + // The viewer and generator that this content provider using. + final private TableViewer fViewer; + final private DataGeneratorCacheManager[] fDataGeneratorCMs; + final private DataGeneratorCacheManager fSumGeneratorCM; + + // Fields used in request cancellation logic. + private List fItemDataRequestMonitors = + new LinkedList(); + private Set fIndexesToCancel = new HashSet(); + private int fCancelCallsPending = 0; + private Future fRefreshFuture; + + public ACPMSumDataViewer(TableViewer viewer, + ImmediateInDsfExecutor dataExecutor, IDataGenerator[] generators, + IDataGenerator sumGenerator) + { + fViewer = viewer; + fDisplayExecutor = DisplayDsfExecutor.getDisplayDsfExecutor( + fViewer.getTable().getDisplay()); + fDataExecutor = dataExecutor; + + // Create wrappers for data generators. Don't need to register as + // listeners to generator events because the cache managers ensure data + // are already registered for them. + fDataGeneratorCMs = new DataGeneratorCacheManager[generators.length]; + for (int i = 0; i < generators.length; i++) { + fDataGeneratorCMs[i] = + new DataGeneratorCacheManager(fDataExecutor, generators[i]); + } + fSumGeneratorCM = + new DataGeneratorCacheManager(fDataExecutor, sumGenerator); + + // Schedule a task to refresh the viewer periodically. + fRefreshFuture = fDisplayExecutor.scheduleAtFixedRate( + new Runnable() { + public void run() { + queryItemCount(); + } + }, + UPDATE_INTERVAL, UPDATE_INTERVAL, TimeUnit.MILLISECONDS); + } + + public void dispose() { + // Cancel the periodic task of refreshing the view. + fRefreshFuture.cancel(false); + + // Need to dispose cache managers that were created in this class. This + // needs to be done on the cache manager's thread. + Query disposeCacheManagersQuery = new Query() { + @Override + protected void execute(DataRequestMonitor rm) { + fSumGeneratorCM.dispose(); + for (DataGeneratorCacheManager dataGeneratorCM : + fDataGeneratorCMs) + { + dataGeneratorCM.dispose(); + } + rm.setData(new Object()); + rm.done(); + } + }; + fDataExecutor.execute(disposeCacheManagersQuery); + try { + disposeCacheManagersQuery.get(); + } + catch (InterruptedException e) {} + catch (ExecutionException e) {} + + // Cancel any outstanding data requests. + for (ValueRequestMonitor rm : fItemDataRequestMonitors) { + rm.cancel(); + } + fItemDataRequestMonitors.clear(); + } + + public void inputChanged(Viewer viewer, Object oldInput, Object newInput) { + // Set the initial count to the viewer after the input is set. + queryItemCount(); + } + + public void updateElement(final int index) { + // Calculate the visible index range. + final int topIdx = fViewer.getTable().getTopIndex(); + final int botIdx = topIdx + getVisibleItemCount(topIdx); + + // Request the item for the given index. + queryValue(index); + + // Invoke a cancel task with a delay. The delay allows multiple cancel + // calls to be combined together improving performance of the viewer. + fCancelCallsPending++; + fDisplayExecutor.execute( + new Runnable() { public void run() { + cancelStaleRequests(topIdx, botIdx); + }}); + } + + /** + * Calculates the number of visible items based on the top item index and + * table bounds. + * @param top Index of top item. + * @return calculated number of items in viewer + */ + private int getVisibleItemCount(int top) { + Table table = fViewer.getTable(); + int itemCount = table.getItemCount(); + return Math.min( + (table.getBounds().height / table.getItemHeight()) + 2, + itemCount - top); + } + + /** + * Retrieve the current count. When a new count is set to viewer, the viewer + * will refresh all items as well. + */ + private void queryItemCount() { + // Create the request monitor to collect the count. This request + // monitor will be completed by the following transaction. + final DataRequestMonitor rm = + new DataRequestMonitor(fDisplayExecutor, null) + { + @Override + protected void handleSuccess() { + setCountToViewer(getData()); + } + @Override + protected void handleRejectedExecutionException() {} // Shutting down, ignore. + }; + + // Use a transaction, even with a single cache. This will ensure that + // if the cache is reset during processing by an event. The request + // for data will be re-issued. + fDataExecutor.execute(new Runnable() { + public void run() { + new Transaction() { + @Override + protected Integer process() + throws Transaction.InvalidCacheException, CoreException + { + return processCount(this); + } + }.request(rm); + } + }); + } + + /** + * Perform the count retrieval from the sum data generator. + * @param transaction The ACPM transaction to use for calculation. + * @return Calculated count. + * @throws Transaction.InvalidCacheException {@link Transaction#process} + * @throws CoreException See {@link Transaction#process} + */ + private Integer processCount(Transaction transaction) + throws Transaction.InvalidCacheException, CoreException + { + ICache countCache = fSumGeneratorCM.getCount(); + transaction.validate(countCache); + return countCache.getData(); + } + + /** + * Set the givne count to the viewer. This will cause the viewer will + * refresh all items' data as well. + *

Note: This method must be called in the display thread.

+ * @param count New count to set to viewer. + */ + private void setCountToViewer(int count) { + if (!fViewer.getTable().isDisposed()) { + fViewer.setItemCount(count); + fViewer.getTable().clearAll(); + } + } + + /** + * Retrieve the current value for given index. + */ + private void queryValue(final int index) { + // Create the request monitor to collect the value. This request + // monitor will be completed by the following transaction. + final ValueRequestMonitor rm = new ValueRequestMonitor(index) { + @Override + protected void handleCompleted() { + fItemDataRequestMonitors.remove(this); + if (isSuccess()) { + setValueToViewer(index, getData()); + } + } + @Override + protected void handleRejectedExecutionException() { + // Shutting down, ignore. + } + }; + + // Save the value request monitor, to cancel it if the view is + // scrolled. + fItemDataRequestMonitors.add(rm); + + // Use a transaction, even with a single cache. This will ensure that + // if the cache is reset during processing by an event. The request + // for data will be re-issued. + fDataExecutor.execute(new Runnable() { + public void run() { + new Transaction() { + @Override + protected String process() + throws Transaction.InvalidCacheException, CoreException + { + return processValue(this, index); + } + }.request(rm); + } + }); + } + + /** + * Write the view value to the viewer. + *

Note: This method must be called in the display thread.

+ * @param index Index of value to set. + * @param value New value. + */ + private void setValueToViewer(int index, String value) { + if (!fViewer.getTable().isDisposed()) { + fViewer.replace(value, index); + } + } + + /** + * Perform the calculation compose the string with data provider values + * and the sum. This implementation also validates the result. + * @param transaction The ACPM transaction to use for calculation. + * @param index Index of value to calculate. + * @return Calculated value. + * @throws Transaction.InvalidCacheException {@link Transaction#process} + * @throws CoreException See {@link Transaction#process} + */ + private String processValue(Transaction transaction, int index) + throws Transaction.InvalidCacheException, CoreException + { + List> valueCaches = + new ArrayList>(fDataGeneratorCMs.length); + for (DataGeneratorCacheManager dataGeneratorCM : fDataGeneratorCMs) { + valueCaches.add(dataGeneratorCM.getValue(index)); + } + // Validate all value caches at once. This executes needed requests + // in parallel. + transaction.validate(valueCaches); + + // TODO: evaluate sum generator cache in parallel with value caches. + ICache sumCache = fSumGeneratorCM.getValue(index); + transaction.validate(sumCache); + + // Compose the string with values, sum, and validation result. + StringBuilder result = new StringBuilder(); + int calcSum = 0; + for (ICache valueCache : valueCaches) { + if (result.length() != 0) result.append(" + "); + result.append(valueCache.getData()); + calcSum += valueCache.getData(); + } + result.append(" = "); + result.append(sumCache.getData()); + if (calcSum != sumCache.getData()) { + result.append(" !INCORRECT! "); + } + + return result.toString(); + } + + /** + * Dedicated class for data item requests. This class holds the index + * argument so it can be examined when canceling stale requests. + */ + private class ValueRequestMonitor extends DataRequestMonitor { + /** Index is used when canceling stale requests. */ + int fIndex; + + ValueRequestMonitor(int index) { + super(fDisplayExecutor, null); + fIndex = index; + } + + @Override + protected void handleRejectedExecutionException() { + // Shutting down, ignore. + } + } + + /** + * Cancels any outstanding value requests for items which are no longer + * visible in the viewer. + * + * @param topIdx Index of top visible item in viewer. + * @param botIdx Index of bottom visible item in viewer. + */ + private void cancelStaleRequests(int topIdx, int botIdx) { + // Decrement the count of outstanding cancel calls. + fCancelCallsPending--; + + // Must check again, in case disposed while re-dispatching. + if (fDataGeneratorCMs == null || fViewer.getTable().isDisposed()) { + return; + } + + // Go through the outstanding requests and cancel any that + // are not visible anymore. + for (Iterator itr = + fItemDataRequestMonitors.iterator(); itr.hasNext();) + { + ValueRequestMonitor item = itr.next(); + if (item.fIndex < topIdx || item.fIndex > botIdx) { + // Set the item to canceled status, so that the data provider + // will ignore it. + item.cancel(); + + // Add the item index to list of indexes that were canceled, + // which will be sent to the table widget. + fIndexesToCancel.add(item.fIndex); + + // Remove the item from the outstanding cancel requests. + itr.remove(); + } + } + if (!fIndexesToCancel.isEmpty() && fCancelCallsPending == 0) { + Set canceledIdxs = fIndexesToCancel; + fIndexesToCancel = new HashSet(); + + // Clear the indexes of the canceled request, so that the + // viewer knows to request them again when needed. + // Note: clearing using TableViewer.clear(int) seems very + // inefficient, it's better to use Table.clear(int[]). + int[] canceledIdxsArray = new int[canceledIdxs.size()]; + int i = 0; + for (Integer index : canceledIdxs) { + canceledIdxsArray[i++] = index; + } + fViewer.getTable().clear(canceledIdxsArray); + } + } + + /** + * The entry point for the example. + * @param args Program arguments. + */ + public static void main(String[] args) { + // Create the shell to hold the viewer. + Display display = new Display(); + Shell shell = new Shell(display, SWT.SHELL_TRIM); + shell.setLayout(new GridLayout()); + GridData data = new GridData(GridData.FILL_BOTH); + shell.setLayoutData(data); + Font font = new Font(display, "Courier", 10, SWT.NORMAL); + + // Create the table viewer. + TableViewer tableViewer = + new TableViewer(shell, SWT.BORDER | SWT.VIRTUAL); + tableViewer.getControl().setLayoutData(data); + + DsfExecutor executor = new DefaultDsfExecutor("Example executor"); + + // Create the data generator. + final IDataGenerator[] generators = new IDataGenerator[5]; + for (int i = 0; i < generators.length; i++) { + generators[i] = new DataGeneratorWithExecutor(executor); + } + final IDataGenerator sumGenerator = + new ACPMSumDataGenerator(executor, generators); + + // Create the content provider which will populate the viewer. + ACPMSumDataViewer contentProvider = new ACPMSumDataViewer( + tableViewer, new ImmediateInDsfExecutor(executor), + generators, sumGenerator); + tableViewer.setContentProvider(contentProvider); + tableViewer.setInput(new Object()); + + // Open the shell and service the display dispatch loop until user + // closes the shell. + shell.open(); + while (!shell.isDisposed()) { + if (!display.readAndDispatch()) + display.sleep(); + } + + // The IDataGenerator.shutdown() method is asynchronous, this requires + // using a query again in order to wait for its completion. + Query shutdownQuery = new Query() { + @Override + protected void execute(DataRequestMonitor rm) { + CountingRequestMonitor crm = new CountingRequestMonitor( + ImmediateExecutor.getInstance(), rm); + for (int i = 0; i < generators.length; i++) { + generators[i].shutdown(crm); + } + sumGenerator.shutdown(crm); + crm.setDoneCount(generators.length); + } + }; + + executor.execute(shutdownQuery); + try { + shutdownQuery.get(); + } catch (Exception e) {} + + // Shut down the display. + font.dispose(); + display.dispose(); + } +} diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncSumDataGenerator.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncSumDataGenerator.java new file mode 100644 index 00000000000..1d8a84c49e3 --- /dev/null +++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncSumDataGenerator.java @@ -0,0 +1,171 @@ +/******************************************************************************* + * Copyright (c) 2011 Wind River Systems and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +//#ifdef exercises +package org.eclipse.cdt.examples.dsf.dataviewer; +//#else +//#package org.eclipse.cdt.examples.dsf.dataviewer.answers; +//#endif + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.eclipse.cdt.dsf.concurrent.CountingRequestMonitor; +import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor; +import org.eclipse.cdt.dsf.concurrent.DsfExecutor; +import org.eclipse.cdt.dsf.concurrent.ImmediateExecutor; +import org.eclipse.cdt.dsf.concurrent.RequestMonitor; + +/** + * A data generator which performs a sum computation on data retrieved from a + * number of other data generators. The data retrieval from other generators + * is performed in parallel and the result is calculated once all data is + * received. + *

+ * This calculating generator does not listen to events from the data + * providers so it relies on the client to re-retrieve data as needed. + *

+ */ +public class AsyncSumDataGenerator implements IDataGenerator { + + /** + * DSF executor used to serialize data access within this data generator. + */ + final private DsfExecutor fExecutor; + + /** + * Data generators to retrieve original data to perform calculations on. + */ + final private IDataGenerator[] fDataGenerators; + + public AsyncSumDataGenerator(DsfExecutor executor, + IDataGenerator[] generators) + { + fExecutor = executor; + fDataGenerators = generators; + } + + public void getCount(final DataRequestMonitor rm) { + // Artificially delay the retrieval of the sum data to simulate + // real processing time. + fExecutor.schedule( new Runnable() { + public void run() { + doGetCount(rm); + } + }, + PROCESSING_DELAY, TimeUnit.MILLISECONDS); + } + + /** + * Performs the actual count retrieval and calculation. + * @param rm Request monitor to complete with data. + */ + private void doGetCount(final DataRequestMonitor rm) { + // Array to store counts retrieved asynchronously + final int[] counts = new int[fDataGenerators.length]; + + // Counting request monitor is called once all data is retrieved. + final CountingRequestMonitor crm = + new CountingRequestMonitor(fExecutor, rm) + { + @Override + protected void handleSuccess() { + // Pick the highest count value. + Arrays.sort(counts, 0, counts.length - 1); + int maxCount = counts[counts.length - 1]; + rm.setData(maxCount); + rm.done(); + }; + }; + + // Each call to data generator fills in one value in array. + for (int i = 0; i < fDataGenerators.length; i++) { + final int finalI = i; + fDataGenerators[i].getCount( + new DataRequestMonitor( + ImmediateExecutor.getInstance(), crm) + { + @Override + protected void handleSuccess() { + counts[finalI] = getData(); + crm.done(); + } + }); + } + crm.setDoneCount(fDataGenerators.length); + } + + public void getValue(final int index, final DataRequestMonitor rm) + { + // Artificially delay the retrieval of the sum data to simulate + // real processing time. + fExecutor.schedule( new Runnable() { + public void run() { + doGetValue(index, rm); + } + }, + PROCESSING_DELAY, TimeUnit.MILLISECONDS); + } + + /** + * Performs the actual value retrieval and calculation. + * @param rm Request monitor to complete with data. + */ + private void doGetValue(int index, final DataRequestMonitor rm) { + // Array to store counts retrieved asynchronously + final int[] values = new int[fDataGenerators.length]; + + // Counting request monitor is called once all data is retrieved. + final CountingRequestMonitor crm = + new CountingRequestMonitor(fExecutor, rm) + { + @Override + protected void handleSuccess() { + // Sum up values in array. + int sum = 0; + for (int value : values) { + sum += value; + } + rm.setData(sum); + rm.done(); + }; + }; + + // Each call to data generator fills in one value in array. + for (int i = 0; i < fDataGenerators.length; i++) { + final int finalI = i; + fDataGenerators[i].getValue( + index, + new DataRequestMonitor( + ImmediateExecutor.getInstance(), crm) + { + @Override + protected void handleSuccess() { + values[finalI] = getData(); + crm.done(); + } + }); + } + crm.setDoneCount(fDataGenerators.length); + } + + public void shutdown(RequestMonitor rm) { + rm.done(); + } + + public void addListener(final Listener listener) { + // no events generated + } + + public void removeListener(Listener listener) { + // no events generated + } + +} diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncSumDataViewer.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncSumDataViewer.java new file mode 100644 index 00000000000..c34252562ee --- /dev/null +++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncSumDataViewer.java @@ -0,0 +1,410 @@ +/******************************************************************************* + * Copyright (c) 2011 Wind River Systems and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +//#ifdef exercises +package org.eclipse.cdt.examples.dsf.dataviewer; +//#else +//#package org.eclipse.cdt.examples.dsf.dataviewer.answers; +//#endif + +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.cdt.dsf.concurrent.ConfinedToDsfExecutor; +import org.eclipse.cdt.dsf.concurrent.CountingRequestMonitor; +import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor; +import org.eclipse.cdt.dsf.concurrent.DefaultDsfExecutor; +import org.eclipse.cdt.dsf.concurrent.DsfExecutor; +import org.eclipse.cdt.dsf.concurrent.ImmediateExecutor; +import org.eclipse.cdt.dsf.concurrent.Query; +import org.eclipse.cdt.dsf.concurrent.ThreadSafe; +import org.eclipse.cdt.dsf.ui.concurrent.DisplayDsfExecutor; +import org.eclipse.jface.viewers.ILazyContentProvider; +import org.eclipse.jface.viewers.TableViewer; +import org.eclipse.jface.viewers.Viewer; +import org.eclipse.swt.SWT; +import org.eclipse.swt.graphics.Font; +import org.eclipse.swt.layout.GridData; +import org.eclipse.swt.layout.GridLayout; +import org.eclipse.swt.widgets.Display; +import org.eclipse.swt.widgets.Shell; +import org.eclipse.swt.widgets.Table; + +/** + * Data viewer based on a table, which reads data from multiple data + * providers using asynchronous methods and performs a compultation + * on the retrieved data. + *

+ * This example builds on the {@link AsyncDataViewer} example and + * demonstrates the pitfalls of retrieving data from multiple sources + * asynchronously: The data is retrieved separate from a set of providers + * as well as from a data provider that sums the values from the other + * providers. The viewer then performs a check to ensure consistency of + * retrieved data. If the retrieved data is inconsistent an "INCORRECT" + * label is added in the viewer. + *

+ *

+ * This viewer is updated periodically every 10 seconds, instead of being + * updated with every change in every data provider, which would overwhelm + * the viewer. + *

+ */ +@ConfinedToDsfExecutor("fDisplayExecutor") +public class AsyncSumDataViewer implements ILazyContentProvider +{ + /** View update frequency interval. */ + final private static int UPDATE_INTERVAL = 10000; + + /** Executor to use instead of Display.asyncExec(). **/ + @ThreadSafe + final private DsfExecutor fDisplayExecutor; + + // The viewer and generator that this content provider using. + final private TableViewer fViewer; + final private IDataGenerator[] fDataGenerators; + final private IDataGenerator fSumGenerator; + + // Fields used in request cancellation logic. + private List fItemDataRequestMonitors = + new LinkedList(); + private Set fIndexesToCancel = new HashSet(); + private int fCancelCallsPending = 0; + private Future fRefreshFuture; + + public AsyncSumDataViewer(TableViewer viewer, + IDataGenerator[] generators, IDataGenerator sumGenerator) + { + fViewer = viewer; + fDisplayExecutor = DisplayDsfExecutor.getDisplayDsfExecutor( + fViewer.getTable().getDisplay()); + fDataGenerators = generators; + fSumGenerator = sumGenerator; + + // Schedule a task to refresh the viewer periodically. + fRefreshFuture = fDisplayExecutor.scheduleAtFixedRate( + new Runnable() { + public void run() { + queryItemCount(); + } + }, + UPDATE_INTERVAL, UPDATE_INTERVAL, TimeUnit.MILLISECONDS); + } + + public void dispose() { + // Cancel the periodic task of refreshing the view. + fRefreshFuture.cancel(false); + + // Cancel any outstanding data requests. + for (ValueCountingRequestMonitor rm : fItemDataRequestMonitors) { + rm.cancel(); + } + fItemDataRequestMonitors.clear(); + } + + public void inputChanged(Viewer viewer, Object oldInput, Object newInput) { + // Set the initial count to the viewer after the input is set. + queryItemCount(); + } + + public void updateElement(final int index) { + // Calculate the visible index range. + final int topIdx = fViewer.getTable().getTopIndex(); + final int botIdx = topIdx + getVisibleItemCount(topIdx); + + // Request the item for the given index. + queryValue(index); + + // Invoke a cancel task with a delay. The delay allows multiple cancel + // calls to be combined together improving performance of the viewer. + fCancelCallsPending++; + fDisplayExecutor.execute( + new Runnable() { public void run() { + cancelStaleRequests(topIdx, botIdx); + }}); + } + + /** + * Calculates the number of visible items based on the top item index and + * table bounds. + * @param top Index of top item. + * @return calculated number of items in viewer + */ + private int getVisibleItemCount(int top) { + Table table = fViewer.getTable(); + int itemCount = table.getItemCount(); + return Math.min( + (table.getBounds().height / table.getItemHeight()) + 2, + itemCount - top); + } + + /** + * Retrieve the up to date count. + */ + private void queryItemCount() { + // Note:The count is retrieved from the sum generator only, the sum + // generator is responsible for calculating the count based on + // individual data providers' counts. + fIndexesToCancel.clear(); + fSumGenerator.getCount( + new DataRequestMonitor(fDisplayExecutor, null) { + @Override + protected void handleSuccess() { + setCountToViewer(getData()); + } + @Override + protected void handleRejectedExecutionException() { + // Shutting down, ignore. + } + }); + } + + /** + * Set the givne count to the viewer. This will cause the viewer will + * refresh all items' data as well. + * @param count New count to set to viewer. + */ + private void setCountToViewer(int count) { + if (!fViewer.getTable().isDisposed()) { + fViewer.setItemCount(count); + fViewer.getTable().clearAll(); + } + } + + /** + * Retrieves value of an element at given index. When complete the value + * is written to the viewer. + * @param index Index of value to retrieve. + */ + private void queryValue(final int index) { + // Values retrieved asynchronously from providers are stored in local + // arrays. + final int[] values = new int[fDataGenerators.length]; + final int[] sum = new int[1]; + + // Counting request monitor is invoked when the required number of + // value requests is completed. + final ValueCountingRequestMonitor crm = + new ValueCountingRequestMonitor(index) + { + @Override + protected void handleCompleted() { + fItemDataRequestMonitors.remove(this); + + // Check if the request completed successfully, otherwise + // ignore it. + if (isSuccess()) { + StringBuilder result = new StringBuilder(); + int calcSum = 0; + for (int value : values) { + if (result.length() != 0) result.append(" + "); + result.append(value); + calcSum += value; + } + result.append(" = "); + result.append(sum[0]); + if (calcSum != sum[0]) { + result.append(" !INCORRECT! "); + } + setValueToViewer(fIndex, result.toString()); + } + }; + }; + + // Request data from each data generator. + for (int i = 0; i < fDataGenerators.length; i++) { + final int finalI = i; + fDataGenerators[i].getValue( + index, + // Use the display executor to construct the request monitor, + // this will cause the handleCompleted() method to be + // automatically called on the display thread. + new DataRequestMonitor( + ImmediateExecutor.getInstance(), crm) + { + @Override + protected void handleSuccess() { + values[finalI] = getData(); + crm.done(); + } + }); + } + + // Separately request data from the sum data generator. + fSumGenerator.getValue( + index, + new DataRequestMonitor( + ImmediateExecutor.getInstance(), crm) + { + @Override + protected void handleSuccess() { + sum[0] = getData(); + crm.done(); + } + }); + + crm.setDoneCount(fDataGenerators.length + 1); + fItemDataRequestMonitors.add(crm); + } + + /** + * Write the view value to the viewer. + *

Note: This method must be called in the display thread.

+ * @param index Index of value to set. + * @param value New value. + */ + private void setValueToViewer(int index, String value) { + if (!fViewer.getTable().isDisposed()) { + fViewer.replace(value, index); + } + } + + /** + * Dedicated class for data item requests. This class holds the index + * argument so it can be examined when canceling stale requests. + */ + private class ValueCountingRequestMonitor extends CountingRequestMonitor { + /** Index is used when canceling stale requests. */ + int fIndex; + + ValueCountingRequestMonitor(int index) { + super(fDisplayExecutor, null); + fIndex = index; + } + + @Override + protected void handleRejectedExecutionException() { + // Shutting down, ignore. + } + } + + /** + * Cancels any outstanding value requests for items which are no longer + * visible in the viewer. + * + * @param topIdx Index of top visible item in viewer. + * @param botIdx Index of bottom visible item in viewer. + */ + private void cancelStaleRequests(int topIdx, int botIdx) { + // Decrement the count of outstanding cancel calls. + fCancelCallsPending--; + + // Must check again, in case disposed while re-dispatching. + if (fDataGenerators == null || fViewer.getTable().isDisposed()) return; + + // Go through the outstanding requests and cancel any that + // are not visible anymore. + for (Iterator itr = + fItemDataRequestMonitors.iterator(); itr.hasNext();) + { + ValueCountingRequestMonitor item = itr.next(); + if (item.fIndex < topIdx || item.fIndex > botIdx) { + // Set the item to canceled status, so that the data provider + // will ignore it. + item.cancel(); + + // Add the item index to list of indexes that were canceled, + // which will be sent to the table widget. + fIndexesToCancel.add(item.fIndex); + + // Remove the item from the outstanding cancel requests. + itr.remove(); + } + } + if (!fIndexesToCancel.isEmpty() && fCancelCallsPending == 0) { + Set canceledIdxs = fIndexesToCancel; + fIndexesToCancel = new HashSet(); + + // Clear the indexes of the canceled request, so that the + // viewer knows to request them again when needed. + // Note: clearing using TableViewer.clear(int) seems very + // inefficient, it's better to use Table.clear(int[]). + int[] canceledIdxsArray = new int[canceledIdxs.size()]; + int i = 0; + for (Integer index : canceledIdxs) { + canceledIdxsArray[i++] = index; + } + fViewer.getTable().clear(canceledIdxsArray); + } + } + + /** + * The entry point for the example. + * @param args Program arguments. + */ + public static void main(String[] args) { + // Create the shell to hold the viewer. + Display display = new Display(); + Shell shell = new Shell(display, SWT.SHELL_TRIM); + shell.setLayout(new GridLayout()); + GridData data = new GridData(GridData.FILL_BOTH); + shell.setLayoutData(data); + Font font = new Font(display, "Courier", 10, SWT.NORMAL); + + // Create the table viewer. + TableViewer tableViewer = + new TableViewer(shell, SWT.BORDER | SWT.VIRTUAL); + tableViewer.getControl().setLayoutData(data); + + // Single executor (and single thread) is used by all data generators, + // including the sum generator. + DsfExecutor executor = new DefaultDsfExecutor("Example executor"); + + // Create the data generator. + final IDataGenerator[] generators = new IDataGenerator[5]; + for (int i = 0; i < generators.length; i++) { + generators[i] = new DataGeneratorWithExecutor(executor); + } + final IDataGenerator sumGenerator = + new AsyncSumDataGenerator(executor, generators); + + // Create the content provider which will populate the viewer. + AsyncSumDataViewer contentProvider = + new AsyncSumDataViewer(tableViewer, generators, sumGenerator); + tableViewer.setContentProvider(contentProvider); + tableViewer.setInput(new Object()); + + // Open the shell and service the display dispatch loop until user + // closes the shell. + shell.open(); + while (!shell.isDisposed()) { + if (!display.readAndDispatch()) + display.sleep(); + } + + // The IDataGenerator.shutdown() method is asynchronous, this requires + // using a query again in order to wait for its completion. + Query shutdownQuery = new Query() { + @Override + protected void execute(DataRequestMonitor rm) { + CountingRequestMonitor crm = new CountingRequestMonitor( + ImmediateExecutor.getInstance(), rm); + for (int i = 0; i < generators.length; i++) { + generators[i].shutdown(crm); + } + sumGenerator.shutdown(crm); + crm.setDoneCount(generators.length); + } + }; + + executor.execute(shutdownQuery); + try { + shutdownQuery.get(); + } catch (Exception e) {} + + // Shut down the display. + font.dispose(); + display.dispose(); + } +} diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorCacheManager.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorCacheManager.java new file mode 100644 index 00000000000..1e1d4e74e89 --- /dev/null +++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorCacheManager.java @@ -0,0 +1,165 @@ +/******************************************************************************* + * Copyright (c) 2011 Wind River Systems and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +//#ifdef exercises +package org.eclipse.cdt.examples.dsf.dataviewer; +//#else +//#package org.eclipse.cdt.examples.dsf.dataviewer.answers; +//#endif + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor; +import org.eclipse.cdt.dsf.concurrent.ICache; +import org.eclipse.cdt.dsf.concurrent.ImmediateInDsfExecutor; +import org.eclipse.cdt.dsf.concurrent.RequestCache; +import org.eclipse.cdt.examples.dsf.DsfExamplesPlugin; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.Status; + +/** + * A wrapper class for the {@link IDataGenerator} interface, which returns + * ACPM cache objects to use for data retrieval instead of calling + * {@link IDataGenerator} asynchronous methods directly. + */ +public class DataGeneratorCacheManager implements IDataGenerator.Listener { + + /** Cache class for retrieving the data generator's count. */ + private class CountCache extends RequestCache { + + public CountCache() { + super(fExecutor); + } + + @Override + protected void retrieve(DataRequestMonitor rm) { + fDataGenerator.getCount(rm); + } + + /** + * Reset the cache when the count is changed. + */ + public void countChanged() { + // Make sure that if clients are currently waiting for a count, + // they are notified of the update (their request monitors will be + // completed with an error). They shoudl then re-request data + // from provider again. + setAndReset(null, new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Count changed")); + } + } + + /** Cache class for retrieving the data generator's values. */ + private class ValueCache extends RequestCache { + private int fIndex; + + public ValueCache(int index) { + super(fExecutor); + fIndex = index; + } + + @Override + protected void retrieve(org.eclipse.cdt.dsf.concurrent.DataRequestMonitor rm) { + fDataGenerator.getValue(fIndex, rm); + }; + + /** + * @see CountCache#countChanged() + */ + public void valueChanged() { + setAndReset(null, new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Value changed")); + } + } + + /** + * Executor used to synchronize data access in this cache manager. + * It has to be the same executor that is used by the data generators in + * order to guarantee data consistency. + */ + private ImmediateInDsfExecutor fExecutor; + + /** + * Data generator that this cache manager is a wrapper for. + */ + private IDataGenerator fDataGenerator; + + /** Cache for data generator's count */ + private CountCache fCountCache; + + /** + * Map of caches for retrieving values. Each value index has a separate + * cache value object. + */ + private Map fValueCaches = new HashMap(); + + public DataGeneratorCacheManager(ImmediateInDsfExecutor executor, IDataGenerator dataGenerator) { + fExecutor = executor; + fDataGenerator = dataGenerator; + fDataGenerator.addListener(this); + } + + public void dispose() { + fDataGenerator.removeListener(this); + } + + /** + * Returns the data generator that this cache manager wraps. + */ + public IDataGenerator getDataGenerator() { + return fDataGenerator; + } + + /** + * Returns the cache for data generator count. + */ + public ICache getCount() { + if (fCountCache == null) { + fCountCache = new CountCache(); + } + return fCountCache; + } + + /** + * Returns the cache for a value at given index. + * + * @param index Index of value to return. + * @return Cache object for given value. + */ + public ICache getValue(int index) { + ValueCache value = fValueCaches.get(index); + if (value == null) { + value = new ValueCache(index); + fValueCaches.put(index, value); + } + + return value; + } + + public void countChanged() { + // Reset the count cache and all the value caches. + if (fCountCache != null) { + fCountCache.countChanged(); + } + for (ValueCache value : fValueCaches.values()) { + value.valueChanged(); + } + } + + public void valuesChanged(Set indexes) { + // Reset selected value caches. + for (Integer index : indexes) { + ValueCache value = fValueCaches.get(index); + if (value != null) { + value.valueChanged(); + } + } + } +}