This example activity, called List Processing, takes a list as input and
outputs the list with the contents slightly altered. The activity has
two inputs. The first input is called greeting
and expects to receive
values of type String. The second input is
called names and expects
to receive lists of String objects. The output is
called output. For
each list received the activity will append the greeting string to
each value in the list and output the new list.
The server-side activity implementation class inherits from the
MatchedIterativeActivity base class and is very
similar to the server side activity implementations in the previous examples. The only
major difference is that this activity outputs a list so must output
the list begin and list end markers. The processIteration
method (without exceptions for simplicity) is:
protected void processIteration(Object[] iterationData)
{
String greeting = (String) iterationData[0];
ListIterator values = (ListIterator) iterationData[1];
// Output the list begin marker
mOutputBlockWriter.write(ControlBlock.LIST_BEGIN);
// Process the contexts of the list
String name;
while( (name = (String) values.nextValue()) != null)
{
mOutputBlockWriter.write(greeting + " " + name);
}
// Output the list end marker
mOutputBlockWriter.write(ControlBlock.LIST_END);
}
This is very similar to the processIteration method
in the List Reduction
example. Again a ListIterator is used
to access the data in the list.
In this implementation we write a list begin marker to the output before
we start to process the list, we then write a value to the output for
each object read from the list and finally we write a list end marker.
The complete implementation of the server-side List Processing activity is:
// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.
package uk.org.ogsadai.tutorials.activity;
import uk.org.ogsadai.activity.ActivityProcessingException;
import uk.org.ogsadai.activity.ActivityTerminatedException;
import uk.org.ogsadai.activity.ActivityUserException;
import uk.org.ogsadai.activity.MatchedIterativeActivity;
import uk.org.ogsadai.activity.io.ActivityInput;
import uk.org.ogsadai.activity.io.ActivityPipeProcessingException;
import uk.org.ogsadai.activity.io.BlockWriter;
import uk.org.ogsadai.activity.io.ControlBlock;
import uk.org.ogsadai.activity.io.ListIterator;
import uk.org.ogsadai.activity.io.PipeClosedException;
import uk.org.ogsadai.activity.io.PipeIOException;
import uk.org.ogsadai.activity.io.PipeTerminatedException;
import uk.org.ogsadai.activity.io.TypedActivityInput;
import uk.org.ogsadai.activity.io.TypedListActivityInput;
/**
* Tutorial activity that processes the data in a list.
* <p>
* The activity has two inputs. One called 'greeting' is takes <tt>String</tt>
* objects and one called 'names' that takes a list of <tt>String</tt> objects.
* The output (called 'output') is a list of <tt>String</tt> objects that
* is similar to the input list expect that the <tt>String</tt> values are
* now prefixed by the value of the 'greeting' input followed by a space.
* <p>
* For example, if the greeting input is <tt>"Hello"</tt> and the values in
* the input 'names' list are <tt>"Rod"</tt>, <tt>"Jane"</tt> and
* <tt>"Freddy"</tt> then the values on the output list will be
* <tt>"Hello Rod"</tt>, <tt>"Hello Jane"</tt> and <tt>"Hello Freddy"</tt>.
*
* @author The OGSA-DAI Project Team
*/
public class ListProcessingActivity extends MatchedIterativeActivity
{
/** Copyright notice */
private static final String COPYRIGHT_NOTICE =
"Copyright (c) The University of Edinburgh, 2007.";
/** Block writer used to write the activity's output */
private BlockWriter mOutputBlockWriter;
/**
* Provides the input details.
*/
protected ActivityInput[] getIterationInputs()
{
return new ActivityInput[] {
new TypedActivityInput("greeting", String.class),
new TypedListActivityInput("names", String.class)};
}
/**
* Pre-processing.
*/
protected void preprocess() throws ActivityUserException,
ActivityProcessingException, ActivityTerminatedException
{
validateOutput("output");
mOutputBlockWriter = getOutput("output");
}
/**
* Process each iteration.
*/
protected void processIteration(Object[] iterationData)
throws ActivityProcessingException, ActivityTerminatedException,
ActivityUserException
{
try
{
String greeting = (String) iterationData[0];
ListIterator values = (ListIterator) iterationData[1];
// Output the list begin marker
mOutputBlockWriter.write(ControlBlock.LIST_BEGIN);
// Process the contexts of the list
String name;
while( (name = (String) values.nextValue()) != null)
{
mOutputBlockWriter.write(greeting + " " + name);
}
// Output the list end marker
mOutputBlockWriter.write(ControlBlock.LIST_END);
}
catch (PipeClosedException e)
{
// Consumer does not want any more data, just stop.
}
catch (PipeIOException e)
{
throw new ActivityPipeProcessingException(e);
}
catch (PipeTerminatedException e)
{
throw new ActivityTerminatedException();
}
}
/**
* Post-processing.
*/
protected void postprocess() throws ActivityUserException,
ActivityProcessingException, ActivityTerminatedException
{
// No post-processing
}
}
The client-side implementation of the List Processing activity is much the same as the client-side implementation of the List Reduction activity. The only major difference is that lists are now written to the output rather than single values so the nextOuput method need to reflect this. The nextOutput method is implemented as:
/**
* Gets the next output value.
*
* @return a <tt>DataIterator</tt> than gives access to a list of
* <tt>java.lang.String</tt> objects.
*
* @throws DataStreamErrorException
* if there is an error on the data stream.
* @throws UnexpectedDataValueException
* if there is an unexpected data value on the data stream.
* @throws DataSourceUsageException
* if there is an error reading from a data source.
*/
public DataIterator nextOutput()
throws DataStreamErrorException,
UnexpectedDataValueException,
DataSourceUsageException
{
return new DataListIterator(
mOutput.getDataValueIterator(),String.class);
}
This method return a DataIterator object that can be used to access
the String objects in the list.
Because we know the list should only
contain String objects we can specify this in the second parameter
to the DataListIterator constructor. The
DataListIterator class will
therefore handle the reporting of any errors if the objects received
are not Strings.
The complete client-toolkit activity implementation is:
// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.
package uk.org.ogsadai.tutorial.activity.client;
import uk.org.ogsadai.activity.ActivityName;
import uk.org.ogsadai.client.toolkit.Activity;
import uk.org.ogsadai.client.toolkit.ActivityOutput;
import uk.org.ogsadai.client.toolkit.DataIterator;
import uk.org.ogsadai.client.toolkit.DataListIterator;
import uk.org.ogsadai.client.toolkit.SingleActivityOutput;
import uk.org.ogsadai.client.toolkit.activity.ActivityInput;
import uk.org.ogsadai.client.toolkit.activity.BaseActivity;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityInput;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityOutput;
import uk.org.ogsadai.client.toolkit.exception.ActivityIOIllegalStateException;
import uk.org.ogsadai.client.toolkit.exception.DataSourceUsageException;
import uk.org.ogsadai.client.toolkit.exception.DataStreamErrorException;
import uk.org.ogsadai.client.toolkit.exception.UnexpectedDataValueException;
import uk.org.ogsadai.data.ListBegin;
import uk.org.ogsadai.data.ListEnd;
import uk.org.ogsadai.data.StringData;
/**
* Client toolkit activity used to call the ListProcessing activity.
*
* @author The OGSA-DAI Project Team
*/
public class ListProcessing extends BaseActivity implements Activity
{
/** Copyright notice */
private static final String COPYRIGHT_NOTICE =
"Copyright (c) The University of Edinburgh, 2007.";
/** Default activity name */
public final static ActivityName DEFAULT_ACTIVITY_NAME =
new ActivityName("uk.org.ogsadai.tutorial.ListProcessing");
/** The greeting input */
private ActivityInput mGreetingInput;
/** The names input */
private ActivityInput mNamesInput;
/** The activity output */
private ActivityOutput mOutput;
/**
* Constructor.
*/
public ListProcessing()
{
super(DEFAULT_ACTIVITY_NAME);
mGreetingInput = new SimpleActivityInput("greeting");
mNamesInput = new SimpleActivityInput("names");
mOutput = new SimpleActivityOutput("output");
}
/**
* Adds an value to the greeting input.
*
* @param greeting the greeting.
*/
public void addGreeting(String greeting)
{
mGreetingInput.add(new StringData(greeting));
}
/**
* Connects the greeting input to the given output.
*
* @param output output to connect to.
*/
public void connectGreetingInput(SingleActivityOutput output)
{
mGreetingInput.connect(output);
}
/**
* Adds a list of names to the names input.
*
* @param names the names to include in the list.
*/
public void addNames(String[] names)
{
mNamesInput.add(ListBegin.VALUE);
for (int i=0; i<names.length; ++i)
{
mNamesInput.add(new StringData(names[i]));
}
mNamesInput.add(ListEnd.VALUE);
}
/**
* Connects the names input to the given output.
*
* @param output output to connect to.
*/
public void connectNamesInput(SingleActivityOutput output)
{
mNamesInput.connect(output);
}
/**
* Gets the output so that it can be connected to the input of other
* activities.
*
* @return the activity output.
*/
public SingleActivityOutput getOutput()
{
return mOutput.getSingleActivityOutputs()[0];
}
/**
* Gets if the activity has a next output value.
*
* @return <tt>true</tt> if there is another output value, <tt>false</tt>
* otherwise.
*
* @throws DataStreamErrorException
* if there is an error on the data stream.
* @throws UnexpectedDataValueException
* if there is an unexpected data value on the data stream.
* @throws DataSourceUsageException
* if there is an error reading from a data source.
*/
public boolean hasNextOutput()
throws DataStreamErrorException,
UnexpectedDataValueException,
DataSourceUsageException
{
return mOutput.getDataValueIterator().hasNext();
}
/**
* Gets the next output value.
*
* @return a <tt>DataIterator</tt> than gives access to a list of
* <tt>java.lang.String</tt> objects.
*
* @throws DataStreamErrorException
* if there is an error on the data stream.
* @throws UnexpectedDataValueException
* if there is an unexpected data value on the data stream.
* @throws DataSourceUsageException
* if there is an error reading from a data source.
*/
public DataIterator nextOutput()
throws DataStreamErrorException,
UnexpectedDataValueException,
DataSourceUsageException
{
return new DataListIterator(
mOutput.getDataValueIterator(),String.class);
}
/**
* Gets the activity inputs.
*/
protected ActivityInput[] getInputs()
{
return new ActivityInput[]{mGreetingInput, mNamesInput};
}
/**
* Gets the activity outputs.
*/
protected ActivityOutput[] getOutputs()
{
return new ActivityOutput[]{mOutput};
}
/**
* Validates the data of the inputs and outputs.
*/
protected void validateIOState() throws ActivityIOIllegalStateException
{
// No further validation to do
}
}
An example client application that uses this activity is shown here.
You may have to change the base services URL
in the main method depending on your
server. By default this is:
OGSA-DAI Axis Default:
http://localhost:8080/dai/services/
// Copyright (c) The University of Edinburgh, 2007.
// See OGSA-DAI-Licence.txt for licencing information.
package uk.org.ogsadai.tutorial.activity.client.apps;
import java.net.URL;
import uk.org.ogsadai.client.toolkit.DataIterator;
import uk.org.ogsadai.client.toolkit.DataRequestExecutionResource;
import uk.org.ogsadai.client.toolkit.PipelineWorkflow;
import uk.org.ogsadai.client.toolkit.RequestExecutionType;
import uk.org.ogsadai.client.toolkit.ServerProxy;
import uk.org.ogsadai.client.toolkit.activities.delivery.DeliverToRequestStatus;
import uk.org.ogsadai.client.toolkit.exception.RequestExecutionException;
import uk.org.ogsadai.resource.ResourceID;
import uk.org.ogsadai.tutorial.activity.client.ListProcessing;
/**
* Application for the List Processing activity tutorial.
*
* @author The OGSA-DAI Project Team
*/
public class ListProcessingApp
{
/** Copyright notice */
private static final String COPYRIGHT_NOTICE =
"Copyright (c) The University of Edinburgh, 2007.";
/**
* Main method.
*
* @param args command line arguments.
*
* @throws Exception if an unexpected error occurs
*/
public static void main(String[] args) throws Exception
{
URL serverBaseUrl = new URL("http://localhost:8080/wsrf/services/dai/");
ResourceID drerId = new ResourceID("DataRequestExecutionResource");
ServerProxy serverProxy = new ServerProxy();
serverProxy.setDefaultBaseServicesURL(serverBaseUrl);
DataRequestExecutionResource drer =
serverProxy.getDataRequestExecutionResource(drerId);
// Create the activities
ListProcessing listProcessing = new ListProcessing();
DeliverToRequestStatus deliverToRequestStatus =
new DeliverToRequestStatus();
// Add some values to the ListProcessing activity inputs
listProcessing.addGreeting("Hello");
listProcessing.addGreeting("Hi");
listProcessing.addNames(
new String[]{ "Roderick", "Frederick" } );
listProcessing.addNames(
new String[]{ "Rod", "Jane", "Freddy" } );
// Connect the output of ListProcessing to DeliverToRequestStatus
deliverToRequestStatus.connectInput(listProcessing.getOutput());
// Create the workflow
PipelineWorkflow pipeline = new PipelineWorkflow();
pipeline.add(listProcessing);
pipeline.add(deliverToRequestStatus);
// Excecute the workflow
try
{
drer.execute(pipeline, RequestExecutionType.SYNCHRONOUS);
}
catch( RequestExecutionException e)
{
System.out.println("There was an error executing the workflow");
System.out.println(e.getRequestResource().getRequestStatus());
throw e;
}
// Get the result and display it
while(listProcessing.hasNextOutput())
{
DataIterator dataIterator = listProcessing.nextOutput();
System.out.println("list begin");
while( dataIterator.hasNext())
{
System.out.println((String) dataIterator.next());
}
System.out.println("list end");
}
}
}
Note that you may need to alter the value used for the server base URL to correspond to your server installation. The output from this client application is:
list begin Hello Roderick Hello Frederick list end list begin Hi Rod Hi Jane Hi Freddy list end
To get this client to successfully run you must install the server-side
activity on your server. To do this compile your server side activity
to a jar, for example, ListProcessing.jar
then follow the instructions at Section 16.1.8, “Deploying an activity” and
Section 16.1.9, “Extending the supported activities of a resource”. When following these instructions
use the following values:
dai.activity.id: uk.org.ogsadai.tutorial.ListProcessingdai.activity.name: uk.org.ogsadai.tutorial.ListProcessingdai.activity.class: uk.org.ogsadai.tutorials.activity.ListProcessingdai.activity.description: "List Processing activity"
The dai.activity.id and
dai.activity.name values must match the default
activity name used in the constructor of the client-side activity implementation class
ListProcessing. The dai.activity.class
value must be the full class name of the server-side activity class. The
dai.activity.description can be any suitable
textual description.