This example activity processes tuples as received from the output of
an SQL query. The activity has two inputs. The first input is called
columnName and expects to receive objects
of type String. The second input is
called tuples and expects to receive a
list of objects of type uk.org.ogsadai.tuple.Tuple
which is the OGSA-DAI representation of a tuple.
The activity
has one output called result. For each value
received in the columnName
input the activity processes the corresponding list of tuples and
outputs the same list of types but with the values in the column
identified by the columnName input changed
to upper case.
The tuples input actually expects to receive a list of tuples
where the the first object in the list is not actually a tuple but an object
that gives access to the metadata describing the tuples' columns.
The first item in what is called a tuple list is not a Tuple object
but is in fact an instance of
uk.org.ogsadai.metadata.MetadataWrapper.
This class is used to identify
meta-data objects that are treated in a special way by some activities.
This MetadataWrapper object contains an object of type
uk.org.ogsadai.tuple.TupleMetadata that contains the
metadata describing the columns on the tuples. So what is referred to as a list
of tuples actually looks like:
[<Metadata>, <Tuple>, <Tuple>, ....].
The server-side Tuple Processing activity implementation extends from
the MatchedIterativeActivity base class. The
getIterationInputs
method is implemented as:
protected ActivityInput[] getIterationInputs()
{
return new ActivityInput[] {
new TypedActivityInput("columnName", String.class),
new TupleListActivityInput("tuples")};
}
Here we use the TupleListActivityInput which is a
special class used for
inputs that expect to receive lists of tuples.
The processIteration method is
implemented (with exceptions removed for simplicity) as:
protected void processIteration(Object[] iterationData)
{
String columnName = (String) iterationData[0];
TupleListIterator tupleListIterator =
(TupleListIterator) iterationData[1];
// Get the tuple metadata - this is held in a metadata wrapper
MetadataWrapper metadataWrapper =
tupleListIterator.getMetadataWrapper();
TupleMetadata tupleMetadata =
(TupleMetadata) metadataWrapper.getMetadata();
// Find details of the column we wish to change
int columnIndex;
try
{
columnIndex =
tupleMetadata.getColumnMetadataPosition(columnName);
}
catch(ColumnNotFoundException e)
{
throw new ActivityUserException(e);
}
// Write the output list begin and the metadata
mResultBlockWriter.write(ControlBlock.LIST_BEGIN);
mResultBlockWriter.write(metadataWrapper);
// Process the tuples
Tuple tuple;
while( (tuple = (Tuple) tupleListIterator.nextValue()) != null)
{
int columnCount = tupleMetadata.getColumnCount();
ArrayList elements = new ArrayList(columnCount);
for (int i=0; i<columnCount; ++i)
{
Object columnValue = tuple.getObject(i);
if (i==columnIndex)
{
columnValue = columnValue.toString().toUpperCase();
}
elements.add(i, columnValue);
}
Tuple newTuple = new SimpleTuple(elements);
mResultBlockWriter.write(newTuple);
}
// Write the list end marker
mResultBlockWriter.write(ControlBlock.LIST_END);
}
The first part of this method obtains the MetadataWrapper
object and
extracts the TupleMetadata object from it. The
TupleMetadata
object is then used to determine the index of the column with the
same name as the value received from the columnName input. We
can then process the list remembering to write the list begin and
list end markers and also to write the MetadataWrapper object as
the first object in the list. The change we make to the tuples
does not change the metadata so we can output the same MetadataWrapper
as we received from the input. The Tuple interface does not allow
elements of the tuple to be altered so we have to construct a new
Tuple object and copy the contents of the original tuple into it
remembering to change the appropriate column value to upper case as we do so.
The complete implementation of the server-side Tuple Processing activity is:
</// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.
package uk.org.ogsadai.tutorials.activity;
import java.util.ArrayList;
import uk.org.ogsadai.activity.ActivityProcessingException;
import uk.org.ogsadai.activity.ActivityTerminatedException;
import uk.org.ogsadai.activity.ActivityUserException;
import uk.org.ogsadai.activity.MatchedIterativeActivity;
import uk.org.ogsadai.activity.io.ActivityInput;
import uk.org.ogsadai.activity.io.ActivityPipeProcessingException;
import uk.org.ogsadai.activity.io.BlockWriter;
import uk.org.ogsadai.activity.io.ControlBlock;
import uk.org.ogsadai.activity.io.PipeClosedException;
import uk.org.ogsadai.activity.io.PipeIOException;
import uk.org.ogsadai.activity.io.PipeTerminatedException;
import uk.org.ogsadai.activity.io.TupleListActivityInput;
import uk.org.ogsadai.activity.io.TupleListIterator;
import uk.org.ogsadai.activity.io.TypedActivityInput;
import uk.org.ogsadai.metadata.MetadataWrapper;
import uk.org.ogsadai.tuple.ColumnNotFoundException;
import uk.org.ogsadai.tuple.SimpleTuple;
import uk.org.ogsadai.tuple.Tuple;
import uk.org.ogsadai.tuple.TupleMetadata;
/**
* Tutorial activity that processes tuples.
* <p>
* The activity has two inputs:
* <ul>
* <li>columnName: expects input of type <tt>java.lang.String</tt> that
* specify a column name.</li>
* <li>tuples: expects lists of <tt>uk.org.ogsadai.tuple.Tuple</tt> objects
* with the first item in the list being of type
* <tt>uk.org.ogsadai.tuple.TupleMetadata</tt>.</li>
* </ul>
* The activity has one output:
* <ul>
* <li>result: outputs lists of <tt>uk.org.ogsadai.tuple.Tuple</tt> objects
* with the first item in the list being of type
* <tt>uk.org.ogsadai.tuple.TupleMetadata</tt>.</li>
* </ul>
* The activity outputs the same tuples as it receives except all the column
* values in the column specified by the 'columnName' input are converted to
* upper case.
*
* @author The OGSA-DAI Project Team
*/
public class TupleProcessingActivity extends MatchedIterativeActivity
{
/** Copyright notice */
private static final String COPYRIGHT_NOTICE =
"Copyright (c) The University of Edinburgh, 2007.";
/** Block writer used to write the activity's result output */
private BlockWriter mResultBlockWriter;
/**
* Provides the input details.
*/
protected ActivityInput[] getIterationInputs()
{
return new ActivityInput[] {
new TypedActivityInput("columnName", String.class),
new TupleListActivityInput("tuples")};
}
/**
* Pre-processing.
*/
protected void preprocess() throws ActivityUserException,
ActivityProcessingException, ActivityTerminatedException
{
validateOutput("result");
mResultBlockWriter = getOutput("result");
}
/**
* Process an iteration.
*/
protected void processIteration(Object[] iterationData)
throws ActivityProcessingException, ActivityTerminatedException,
ActivityUserException
{
try
{
String columnName = (String) iterationData[0];
TupleListIterator tupleListIterator =
(TupleListIterator) iterationData[1];
// Get the tuple metadata - this is held in a metadata wrapper
MetadataWrapper metadataWrapper =
tupleListIterator.getMetadataWrapper();
TupleMetadata tupleMetadata =
(TupleMetadata) metadataWrapper.getMetadata();
// Find details of the column we wish to change
int columnIndex;
try
{
columnIndex =
tupleMetadata.getColumnMetadataPosition(columnName);
}
catch(ColumnNotFoundException e)
{
throw new ActivityUserException(e);
}
// Write the output list begin and the metadata
mResultBlockWriter.write(ControlBlock.LIST_BEGIN);
mResultBlockWriter.write(metadataWrapper);
// Process the tuples
Tuple tuple;
while( (tuple = (Tuple) tupleListIterator.nextValue()) != null)
{
int columnCount = tupleMetadata.getColumnCount();
ArrayList elements = new ArrayList(columnCount);
for (int i=0; i<columnCount; ++i)
{
Object columnValue = tuple.getObject(i);
if (i==columnIndex)
{
columnValue = columnValue.toString().toUpperCase();
}
elements.add(i, columnValue);
}
Tuple newTuple = new SimpleTuple(elements);
mResultBlockWriter.write(newTuple);
}
// Write the list end marker
mResultBlockWriter.write(ControlBlock.LIST_END);
}
catch (PipeClosedException e)
{
// Consumer does not want any more data, just stop.
}
catch (PipeIOException e)
{
throw new ActivityPipeProcessingException(e);
}
catch (PipeTerminatedException e)
{
throw new ActivityTerminatedException();
}
}
/**
* Post-processing.
*/
protected void postprocess() throws ActivityUserException,
ActivityProcessingException, ActivityTerminatedException
{
// No post-processing
}
}
Note that the above code does not handle the error condition that occurs if
the column being converted to upper case is not normally stored in the
tuple as a String. When constructing the
output tuple we always use a String object for the
column value of the converted column. This may not correspond with the
metadata for that column as we just output the metadata unaltered.
object then
The client-side implementation of the Tuple Processing activity very
similar to the client-side implementations seen in the earlier examples.
In many ways it is even easier in this example because Tuple
objects
are not one of the primitive objects that OGSA-DAI allows as literal
inputs or values that can be written to the request status. There is
therefore no need for an addTuples method nor
hasNextResult and
nextResult methods.
The complete client-toolkit activity implementation is:
// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.
package uk.org.ogsadai.tutorial.activity.client;
import uk.org.ogsadai.activity.ActivityName;
import uk.org.ogsadai.client.toolkit.Activity;
import uk.org.ogsadai.client.toolkit.ActivityOutput;
import uk.org.ogsadai.client.toolkit.SingleActivityOutput;
import uk.org.ogsadai.client.toolkit.activity.ActivityInput;
import uk.org.ogsadai.client.toolkit.activity.BaseActivity;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityInput;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityOutput;
import uk.org.ogsadai.client.toolkit.exception.ActivityIOIllegalStateException;
import uk.org.ogsadai.data.StringData;
/**
* Client toolkit activity used to call the TupleProcessing activity.
*
* @author The OGSA-DAI Project Team
*/
public class TupleProcessing extends BaseActivity implements Activity
{
/** Copyright notice */
private static final String COPYRIGHT_NOTICE =
"Copyright (c) The University of Edinburgh, 2007.";
/** Default activity name */
public final static ActivityName DEFAULT_ACTIVITY_NAME =
new ActivityName("uk.org.ogsadai.tutorial.TupleProcessing");
/** The columnName input */
private ActivityInput mColumnNameInput;
/** The tuples input */
private ActivityInput mTuplesInput;
/** The activity result output */
private ActivityOutput mResultOutput;
/**
* Constructor.
*/
public TupleProcessing()
{
super(DEFAULT_ACTIVITY_NAME);
mColumnNameInput = new SimpleActivityInput("columnName");
mTuplesInput = new SimpleActivityInput("tuples");
mResultOutput = new SimpleActivityOutput("result");
}
/**
* Adds an value to the columnName input.
*
* @param columnName the columnName.
*/
public void addColumnName(String columnName)
{
mColumnNameInput.add(new StringData(columnName));
}
/**
* Connects the columnName input to the given output.
*
* @param output output to connect to.
*/
public void connectColumnNameInput(SingleActivityOutput output)
{
mColumnNameInput.connect(output);
}
/**
* Connects the tuples input to the given output.
*
* @param output output to connect to.
*/
public void connectTuplesInput(SingleActivityOutput output)
{
mTuplesInput.connect(output);
}
/**
* Gets the result output so that it can be connected to the input of other
* activities.
*
* @return the activity output.
*/
public SingleActivityOutput getResultOutput()
{
return mResultOutput.getSingleActivityOutputs()[0];
}
/**
* Gets the activity inputs.
*/
protected ActivityInput[] getInputs()
{
return new ActivityInput[]{mColumnNameInput, mTuplesInput};
}
/**
* Gets the activity outputs.
*/
protected ActivityOutput[] getOutputs()
{
return new ActivityOutput[]{mResultOutput};
}
/**
* Validates the data of the inputs and outputs.
*/
protected void validateIOState() throws ActivityIOIllegalStateException
{
// No further validation to do
}
}
An example client application that uses this activity is shown here.
You may have to change the base services URL
in the main method depending on your
server. By default this is:
OGSA-DAI Axis Default:
http://localhost:8080/dai/services/
// Copyright (c) The University of Edinburgh, 2007.
// See OGSA-DAI-Licence.txt for licencing information.
package uk.org.ogsadai.tutorial.activity.client.apps;
import java.net.URL;
import java.sql.ResultSet;
import uk.org.ogsadai.client.toolkit.DataRequestExecutionResource;
import uk.org.ogsadai.client.toolkit.PipelineWorkflow;
import uk.org.ogsadai.client.toolkit.RequestExecutionType;
import uk.org.ogsadai.client.toolkit.ServerProxy;
import uk.org.ogsadai.client.toolkit.activities.delivery.DeliverToRequestStatus;
import uk.org.ogsadai.client.toolkit.activities.sql.SQLQuery;
import uk.org.ogsadai.client.toolkit.activities.transform.TupleToWebRowSetCharArrays;
import uk.org.ogsadai.client.toolkit.exception.RequestExecutionException;
import uk.org.ogsadai.resource.ResourceID;
import uk.org.ogsadai.tutorial.activity.client.TupleProcessing;
/**
* Application for the TupleProcessing activity tutorial.
*
* @author The OGSA-DAI Project Team
*/
public class TupleProcessingApp
{
/** Copyright notice */
private static final String COPYRIGHT_NOTICE =
"Copyright (c) The University of Edinburgh, 2007.";
/**
* Main method.
*
* @param args command line arguments.
*
* @throws Exception if an unexpected error occurs
*/
public static void main(String[] args) throws Exception
{
URL serverBaseUrl = new URL("http://localhost:8080/wsrf/services/dai/");
ResourceID drerId = new ResourceID("DataRequestExecutionResource");
ServerProxy serverProxy = new ServerProxy();
serverProxy.setDefaultBaseServicesURL(serverBaseUrl);
DataRequestExecutionResource drer =
serverProxy.getDataRequestExecutionResource(drerId);
// Create the activities
SQLQuery sqlQuery = new SQLQuery();
sqlQuery.setResourceID("JDBCResource");
sqlQuery.addExpression(
"select id, name, address from littleblackbook where id < 10");
TupleProcessing tupleProcessing = new TupleProcessing();
tupleProcessing.addColumnName("name");
tupleProcessing.connectTuplesInput(sqlQuery.getDataOutput());
TupleToWebRowSetCharArrays tupleToWebRowSet =
new TupleToWebRowSetCharArrays();
tupleToWebRowSet.connectDataInput(tupleProcessing.getResultOutput());
DeliverToRequestStatus deliverToRequestStatus =
new DeliverToRequestStatus();
deliverToRequestStatus.connectInput(tupleToWebRowSet.getResultOutput());
// Create the workflow
PipelineWorkflow pipeline = new PipelineWorkflow();
pipeline.add(sqlQuery);
pipeline.add(tupleProcessing);
pipeline.add(tupleToWebRowSet);
pipeline.add(deliverToRequestStatus);
// Execute the workflow
try
{
drer.execute(pipeline, RequestExecutionType.SYNCHRONOUS);
}
catch( RequestExecutionException e)
{
System.out.println("There was an error executing the workflow");
System.out.println(e.getRequestResource().getRequestStatus());
throw e;
}
// Get the result and display it
while(tupleToWebRowSet.hasNextResult())
{
ResultSet rs = tupleToWebRowSet.nextResultAsResultSet();
while(rs.next())
{
System.out.print(rs.getInt(1));
System.out.print("\t");
System.out.print(rs.getString(2));
System.out.print("\t");
System.out.println(rs.getString(3));
}
}
}
Note that you may need to alter the value used for the server base URL to
correspond to your server installation. You may also need to alter the
resource ID of the relational resource the SQLQuery
activity is targeted at. When the relational resource exposes the the OGSA-DAI
example littleblackbook database the output from this application is:
1 ALLY ANTONIOLETTI 826 Hume Crescent, Southampton 2 AMY ATKINSON 583 Atkinson Drive, Southampton 3 ANDREW BORLEY 354 Jackson Road, Edinburgh 4 CHARAKA CHUE HONG 750 Pearson Crescent, Southampton 5 DAVE HARDMAN 079 Borley Gardens, Winchester 6 GEORGE HICKEN 398 Magowan Street, Winchester 7 JAMES HUME 801 Laws Gardens, Edinburgh 8 MALCOLM JACKSON 743 Krause Lane, Edinburgh 9 MARIO KRAUSE 026 Atkinson Gardens, Winchester
To get this client to successfully run you must install the server-side
activity on your server. To do this compile your server side activity
to a jar, for example, TupleProcessing.jar
then follow the instructions at Section 16.1.8, “Deploying an activity” and
Section 16.1.9, “Extending the supported activities of a resource”. When following these instructions
use the following values:
dai.activity.id: uk.org.ogsadai.tutorial.TupleProcessingdai.activity.name: uk.org.ogsadai.tutorial.TupleProcessingdai.activity.class: uk.org.ogsadai.tutorials.activity.TupleProcessingActivitydai.activity.description: "Tuple Processing activity"
The dai.activity.id and
dai.activity.name values must match the default
activity name used in the constructor of the client-side activity implementation class
TupleProcessing. The dai.activity.class
value must be the full class name of the server-side activity class. The
dai.activity.description can be any suitable
textual description.