This example activity, called List Reduction, processes data in a list reducing the list to a
single value that is then output. The activity has two inputs. The
first input is called initialValue and expects
to receive objects of type Integer. The second
input is called values and expects to
receive lists of type Integer. There is one output
called output.
The activity reads a value from the initialValue input and a list
of values from the values input and adds all the values in the list
to the initial value. The result of this sum is written as an Integer
to the output.
We implement the server-side activity by extending from the
MatchedIterativeActivity base class. We must implement the
getIterationInputs method to return details of the inputs to the
activity. This is implemented as:
protected ActivityInput[] getIterationInputs()
{
return new ActivityInput[] {
new TypedActivityInput("initialValue", Integer.class),
new TypedListActivityInput("values", Integer.class)};
}
We have used TypedActivityInput for the
initialValue input and
specified that blocks must be of type Integer.
For the values input that expects to receive
lists of Integer objects we use
the TypedListActivityInput class to specify
that we expect a list and provide Integer.class
as a parameter value to specify that
the objects inside the list should be of type Integer.
We validate the output and obtain a corresponding BlockWriter in
exactly the same way as we did in the previous examples.
We now have to process this data inside the processIteration
method. Ignoring exceptions for simplicity this is implemented as:
protected void processIteration(Object[] iterationData)
{
Integer initialValue = (Integer) iterationData[0];
ListIterator values = (ListIterator) iterationData[1];
// Get the initial value
int value = initialValue.intValue();
// Process all the integers in the list
Integer nextValue;
while( (nextValue = (Integer) values.nextValue()) != null)
{
value += nextValue.intValue();
}
// Output the final value
mOutputBlockWriter.write(new Integer(value));
}
Note that because we provided two items in the array returned by
getIterationInputs we get two items in the array that is passed
to processIteration. The first of these items is the
initialValue input and the second is an iterator that gives access to
the data in the list in the values input. The rest of the code
simply loops through the list getting each value and doing the
required calculation. Note that the end of the list is indicated
by the nextValue method returning null.
It is important to note that the ListIterator object
passed to the processIteration method in this example allows
us to stream the values from the list. The base class does not read the whole list into
memory and then pass an iterator to this data to processIteration.
Instead, the base class simply reads the list begin marker and passes the ListIterator
object to the processIteration method. This
ListIterator object is implemented as a simple wrapper around
a BlockReader. Calls to read data from
ListIterator may therefore block in the same way that calls to
read data from a BlockReader can block. These implementation
details allow the activity to process lists containing millions of objects without ever having
to store the whole list in memory.
Note that each iteration requires one value from the initialValue
input and one whole list from the values input. What would happen
if the initialValue input receives a different number of values from the
number of lists received by the values input? These situations where
inputs are unmatched are handled by the MatchedIterativeActivity
base class. For a more details description of what it means for
an activity to be matched see
Section 24.2.1, “Some definitions”.
The complete implementation of the server-side List Reduction activity is:
// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.
package uk.org.ogsadai.tutorials.activity;
import uk.org.ogsadai.activity.ActivityProcessingException;
import uk.org.ogsadai.activity.ActivityTerminatedException;
import uk.org.ogsadai.activity.ActivityUserException;
import uk.org.ogsadai.activity.MatchedIterativeActivity;
import uk.org.ogsadai.activity.io.ActivityInput;
import uk.org.ogsadai.activity.io.ActivityPipeProcessingException;
import uk.org.ogsadai.activity.io.BlockWriter;
import uk.org.ogsadai.activity.io.ListIterator;
import uk.org.ogsadai.activity.io.PipeClosedException;
import uk.org.ogsadai.activity.io.PipeIOException;
import uk.org.ogsadai.activity.io.PipeTerminatedException;
import uk.org.ogsadai.activity.io.TypedActivityInput;
import uk.org.ogsadai.activity.io.TypedListActivityInput;
/**
* Tutorial activity that reduces the data in a list to something that
* summarizes that data.
*
* The activity has two inputs. One called 'initialValue' is extends Integer
* objects and one called 'values' that expects a list of Integer objects.
* The output for each iteration is the initial value plus the value of each
* of the integers in the list.
*
* @author The OGSA-DAI Project Team
*/
public class ListReductionActivity extends MatchedIterativeActivity
{
/** Copyright notice */
private static final String COPYRIGHT_NOTICE =
"Copyright (c) The University of Edinburgh, 2007.";
/** Block writer used to write the activity's output */
private BlockWriter mOutputBlockWriter;
protected ActivityInput[] getIterationInputs()
{
return new ActivityInput[] {
new TypedActivityInput("initialValue", Integer.class),
new TypedListActivityInput("values", Integer.class)};
}
/**
* Pre-processing.
*/
protected void preprocess() throws ActivityUserException,
ActivityProcessingException, ActivityTerminatedException
{
validateOutput("output");
mOutputBlockWriter = getOutput("output");
}
/**
* Process each iteration.
*/
protected void processIteration(Object[] iterationData)
throws ActivityProcessingException, ActivityTerminatedException,
ActivityUserException
{
try
{
Integer initialValue = (Integer) iterationData[0];
ListIterator values = (ListIterator) iterationData[1];
// Get the initial value
int value = initialValue.intValue();
// Process all the integers in the list
Integer nextValue;
while( (nextValue = (Integer) values.nextValue()) != null)
{
value += nextValue.intValue();
}
// Output the final value
mOutputBlockWriter.write(new Integer(value));
}
catch (PipeClosedException e)
{
// Consumer does not want any more data, just stop.
}
catch (PipeIOException e)
{
throw new ActivityPipeProcessingException(e);
}
catch (PipeTerminatedException e)
{
throw new ActivityTerminatedException();
}
}
/**
* No post-processing.
*/
protected void postprocess() throws ActivityUserException,
ActivityProcessingException, ActivityTerminatedException
{
// No post-processing
}
}
The client-side activity implementation for the List Reduction is very
similar to the client-side activity implementation for the Hello Name
activity. The only method that requires explanation is addValues
which is implemented as:
/**
* Adds a list of values to the values input.
*
* @param values the values to include in the list.
*/
public void addValues(int[] values)
{
mValuesInput.add(ListBegin.VALUE);
for (int i=0; i<values.length; ++i)
{
mValuesInput.add(new IntegerData(values[i]));
}
mValuesInput.add(ListEnd.VALUE);
}
This method adds data to the values input.
The values input expects
to receive lists. Lists are identified in OGSA-DAI using list begin
and list end markers. We therefore must place these markers around
the data to form the list. This method hides the use of these list
markers from the client application developer.
The complete client-toolkit activity implementation is:
// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.
package uk.org.ogsadai.tutorial.activity.client;
import uk.org.ogsadai.activity.ActivityName;
import uk.org.ogsadai.client.toolkit.Activity;
import uk.org.ogsadai.client.toolkit.ActivityOutput;
import uk.org.ogsadai.client.toolkit.SingleActivityOutput;
import uk.org.ogsadai.client.toolkit.activity.ActivityInput;
import uk.org.ogsadai.client.toolkit.activity.BaseActivity;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityInput;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityOutput;
import uk.org.ogsadai.client.toolkit.exception.ActivityIOIllegalStateException;
import uk.org.ogsadai.client.toolkit.exception.DataSourceUsageException;
import uk.org.ogsadai.client.toolkit.exception.DataStreamErrorException;
import uk.org.ogsadai.client.toolkit.exception.UnexpectedDataValueException;
import uk.org.ogsadai.data.IntegerData;
import uk.org.ogsadai.data.ListBegin;
import uk.org.ogsadai.data.ListEnd;
/**
* Client toolkit activity used to call the ListReduction activity.
*
* @author The OGSA-DAI Project Team
*/
public class ListReduction extends BaseActivity implements Activity
{
/** Copyright notice */
private static final String COPYRIGHT_NOTICE =
"Copyright (c) The University of Edinburgh, 2007.";
/** Default activity name */
public final static ActivityName DEFAULT_ACTIVITY_NAME =
new ActivityName("uk.org.ogsadai.tutorial.ListReduction");
/** The initial value */
private ActivityInput mInitialValueInput;
/** The values */
private ActivityInput mValuesInput;
/** The activity output */
private ActivityOutput mOutput;
/**
* Constructor.
*/
public ListReduction()
{
super(DEFAULT_ACTIVITY_NAME);
mInitialValueInput = new SimpleActivityInput("initialValue");
mValuesInput = new SimpleActivityInput("values");
mOutput = new SimpleActivityOutput("output");
}
/**
* Adds an value to the initialValue input.
*
* @param initialValue the initial value.
*/
public void addInitialValue(int initialValue)
{
mInitialValueInput.add(new IntegerData(initialValue));
}
/**
* Connects the initialValue input to the given output.
*
* @param output output to connect to.
*/
public void connectInitialValueInput(SingleActivityOutput output)
{
mInitialValueInput.connect(output);
}
/**
* Adds a list of values to the values input.
*
* @param values the values to include in the list.
*/
public void addValues(int[] values)
{
mValuesInput.add(ListBegin.VALUE);
for (int i=0; i<values.length; ++i)
{
mValuesInput.add(new IntegerData(values[i]));
}
mValuesInput.add(ListEnd.VALUE);
}
/**
* Connects the values input to the given output.
*
* @param output output to connect to.
*/
public void connectValuesInput(SingleActivityOutput output)
{
mValuesInput.connect(output);
}
/**
* Gets the output so that it can be connected to the input of other
* activities.
*
* @return the activity output.
*/
public SingleActivityOutput getOutput()
{
return mOutput.getSingleActivityOutputs()[0];
}
/**
* Gets if the activity has a next output value.
*
* @return <tt>true</tt> if there is another output value, <tt>false</tt>
* otherwise.
*
* @throws DataStreamErrorException
* if there is an error on the data stream.
* @throws UnexpectedDataValueException
* if there is an unexpected data value on the data stream.
* @throws DataSourceUsageException
* if there is an error reading from a data source.
*/
public boolean hasNextOutput()
throws DataStreamErrorException,
UnexpectedDataValueException,
DataSourceUsageException
{
return mOutput.getDataValueIterator().hasNext();
}
/**
* Gets the next output value.
*
* @return the next output value.
*
* @throws DataStreamErrorException
* if there is an error on the data stream.
* @throws UnexpectedDataValueException
* if there is an unexpected data value on the data stream.
* @throws DataSourceUsageException
* if there is an error reading from a data source.
*/
public int nextOutput()
throws DataStreamErrorException,
UnexpectedDataValueException,
DataSourceUsageException
{
return mOutput.getDataValueIterator().nextAsInt();
}
/**
* Gets the activity inputs.
*/
protected ActivityInput[] getInputs()
{
return new ActivityInput[]{mInitialValueInput, mValuesInput};
}
/**
* Gets the activity outputs.
*/
protected ActivityOutput[] getOutputs()
{
return new ActivityOutput[]{mOutput};
}
/**
* Validates the data of the inputs and outputs.
*/
protected void validateIOState() throws ActivityIOIllegalStateException
{
// No further validation to do
}
}
An example client application that uses this activity is shown here.
You may have to change the base services URL
in the main method depending on your
server. By default this is:
OGSA-DAI GT Default:
http://localhost:8080/wsrf/services/dai/
// Copyright (c) The University of Edinburgh, 2007.
// See OGSA-DAI-Licence.txt for licencing information.
package uk.org.ogsadai.tutorial.activity.client.apps;
import java.net.URL;
import uk.org.ogsadai.client.toolkit.DataRequestExecutionResource;
import uk.org.ogsadai.client.toolkit.PipelineWorkflow;
import uk.org.ogsadai.client.toolkit.RequestExecutionType;
import uk.org.ogsadai.client.toolkit.ServerProxy;
import uk.org.ogsadai.client.toolkit.activities.delivery.DeliverToRequestStatus;
import uk.org.ogsadai.client.toolkit.exception.RequestExecutionException;
import uk.org.ogsadai.resource.ResourceID;
import uk.org.ogsadai.tutorial.activity.client.ListReduction;
/**
* Application for the ListReduction activity tutorial.
*
* @author The OGSA-DAI Project Team
*/
public class ListReductionApp
{
/** Copyright notice */
private static final String COPYRIGHT_NOTICE =
"Copyright (c) The University of Edinburgh, 2007.";
/**
* Main method.
*
* @param args command line arguments.
*
* @throws Exception if an unexpected error occurs
*/
public static void main(String[] args) throws Exception
{
URL serverBaseUrl = new URL("http://localhost:8080/wsrf/services/dai/");
ResourceID drerId = new ResourceID("DataRequestExecutionResource");
ServerProxy serverProxy = new ServerProxy();
serverProxy.setDefaultBaseServicesURL(serverBaseUrl);
DataRequestExecutionResource drer =
serverProxy.getDataRequestExecutionResource(drerId);
// Create the activities
ListReduction listReduction = new ListReduction();
DeliverToRequestStatus deliverToRequestStatus =
new DeliverToRequestStatus();
// Add some values to the ListReduction activity inputs
listReduction.addInitialValue(100);
listReduction.addInitialValue(200);
listReduction.addValues(
new int[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
listReduction.addValues(
new int[]{ 1, 10, 100, 1000});
// Connect the output of ListReduction to DeliverToRequestStatus
deliverToRequestStatus.connectInput(listReduction.getOutput());
// Create the workflow
PipelineWorkflow pipeline = new PipelineWorkflow();
pipeline.add(listReduction);
pipeline.add(deliverToRequestStatus);
// Excecute the workflow
try
{
drer.execute(pipeline, RequestExecutionType.SYNCHRONOUS);
}
catch( RequestExecutionException e)
{
System.out.println("There was an error executing the workflow");
System.out.println(e.getRequestResource().getRequestStatus());
throw e;
}
// Get the result and display it
while( listReduction.hasNextOutput())
{
System.out.println(listReduction.nextOutput());
}
}
}
Note that you may need to alter the value used for the server base URL to correspond to your server installation. The output from this client is:
155 1311
To get this client to successfully run you must install the server-side
activity on your server. To do this compile your server side activity
to a jar, for example, ListReduction.jar
then follow the instructions at Section 16.1.8, “Deploying an activity” and
Section 16.1.9, “Extending the supported activities of a resource”. When following these instructions
use the following values:
dai.activity.id: uk.org.ogsadai.tutorial.ListReductiondai.activity.name: uk.org.ogsadai.tutorial.ListReductiondai.activity.class: uk.org.ogsadai.tutorials.activity.ListReductionActivitydai.activity.description: "List Reduction activity"
The dai.activity.id and
dai.activity.name values must match the default
activity name used in the constructor of the client-side activity implementation class
ListReduction. The dai.activity.class
value must be the full class name of the server-side activity class. The
dai.activity.description can be any suitable
textual description.