Chapter 29. Tuple processing example activity

29.1. Developing the server-side activity
29.2. Developing the client-side activity
29.3. Developing the client application

This example activity processes tuples as received from the output of an SQL query. The activity has two inputs. The first input is called columnName and expects to receive objects of type String. The second input is called tuples and expects to receive a list of objects of type uk.org.ogsadai.tuple.Tuple which is the OGSA-DAI representation of a tuple. The activity has one output called result. For each value received in the columnName input the activity processes the corresponding list of tuples and outputs the same list of types but with the values in the column identified by the columnName input changed to upper case.

The tuples input actually expects to receive a list of tuples where the the first object in the list is not actually a tuple but an object that gives access to the metadata describing the tuples' columns. The first item in what is called a tuple list is not a Tuple object but is in fact an instance of uk.org.ogsadai.metadata.MetadataWrapper. This class is used to identify meta-data objects that are treated in a special way by some activities. This MetadataWrapper object contains an object of type uk.org.ogsadai.tuple.TupleMetadata that contains the metadata describing the columns on the tuples. So what is referred to as a list of tuples actually looks like: [<Metadata>, <Tuple>, <Tuple>, ....].

29.1. Developing the server-side activity

The server-side Tuple Processing activity implementation extends from the MatchedIterativeActivity base class. The getIterationInputs method is implemented as:

protected ActivityInput[] getIterationInputs()
{
    return new ActivityInput[] {
        new TypedActivityInput("columnName", String.class),
        new TupleListActivityInput("tuples")};
}

Here we use the TupleListActivityInput which is a special class used for inputs that expect to receive lists of tuples.

The processIteration method is implemented (with exceptions removed for simplicity) as:

 
protected void processIteration(Object[] iterationData)
{
    String columnName = (String) iterationData[0];
    TupleListIterator tupleListIterator = 
        (TupleListIterator) iterationData[1];
    
    // Get the tuple metadata - this is held in a metadata wrapper
    MetadataWrapper metadataWrapper = 
        tupleListIterator.getMetadataWrapper();
    TupleMetadata tupleMetadata = 
        (TupleMetadata) metadataWrapper.getMetadata();
            
    // Find details of the column we wish to change
    int columnIndex;
    try
    {
        columnIndex = 
            tupleMetadata.getColumnMetadataPosition(columnName);
    }
    catch(ColumnNotFoundException e)
    {
        throw new ActivityUserException(e);
    }
    
    // Write the output list begin and the metadata
    mResultBlockWriter.write(ControlBlock.LIST_BEGIN);
    mResultBlockWriter.write(metadataWrapper);
            
    // Process the tuples
    Tuple tuple;
    while( (tuple = (Tuple) tupleListIterator.nextValue()) != null)
    {
        int columnCount = tupleMetadata.getColumnCount();
                
        ArrayList elements = new ArrayList(columnCount);
        for (int i=0; i<columnCount; ++i)
        {
            Object columnValue = tuple.getObject(i);
            if (i==columnIndex)
            {
                columnValue = columnValue.toString().toUpperCase();
            }
            elements.add(i, columnValue);
        }
        Tuple newTuple = new SimpleTuple(elements);
                
        mResultBlockWriter.write(newTuple);
    }
            
    // Write the list end marker
    mResultBlockWriter.write(ControlBlock.LIST_END);
}

The first part of this method obtains the MetadataWrapper object and extracts the TupleMetadata object from it. The TupleMetadata object is then used to determine the index of the column with the same name as the value received from the columnName input. We can then process the list remembering to write the list begin and list end markers and also to write the MetadataWrapper object as the first object in the list. The change we make to the tuples does not change the metadata so we can output the same MetadataWrapper as we received from the input. The Tuple interface does not allow elements of the tuple to be altered so we have to construct a new Tuple object and copy the contents of the original tuple into it remembering to change the appropriate column value to upper case as we do so.

The complete implementation of the server-side Tuple Processing activity is:

 
</// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.

package uk.org.ogsadai.tutorials.activity;

import java.util.ArrayList;

import uk.org.ogsadai.activity.ActivityProcessingException;
import uk.org.ogsadai.activity.ActivityTerminatedException;
import uk.org.ogsadai.activity.ActivityUserException;
import uk.org.ogsadai.activity.MatchedIterativeActivity;
import uk.org.ogsadai.activity.io.ActivityInput;
import uk.org.ogsadai.activity.io.ActivityPipeProcessingException;
import uk.org.ogsadai.activity.io.BlockWriter;
import uk.org.ogsadai.activity.io.ControlBlock;
import uk.org.ogsadai.activity.io.PipeClosedException;
import uk.org.ogsadai.activity.io.PipeIOException;
import uk.org.ogsadai.activity.io.PipeTerminatedException;
import uk.org.ogsadai.activity.io.TupleListActivityInput;
import uk.org.ogsadai.activity.io.TupleListIterator;
import uk.org.ogsadai.activity.io.TypedActivityInput;
import uk.org.ogsadai.metadata.MetadataWrapper;
import uk.org.ogsadai.tuple.ColumnNotFoundException;
import uk.org.ogsadai.tuple.SimpleTuple;
import uk.org.ogsadai.tuple.Tuple;
import uk.org.ogsadai.tuple.TupleMetadata;

/**
 * Tutorial activity that processes tuples. 
 * <p>
 * The activity has two inputs:
 * <ul>
 *   <li>columnName: expects input of type <tt>java.lang.String</tt> that
 *       specify a column name.</li>
 *   <li>tuples: expects lists of <tt>uk.org.ogsadai.tuple.Tuple</tt> objects
 *       with the first item in the list being of type 
 *       <tt>uk.org.ogsadai.tuple.TupleMetadata</tt>.</li>
 * </ul>
 * The activity has one output:
 * <ul>
 *   <li>result: outputs lists of <tt>uk.org.ogsadai.tuple.Tuple</tt> objects
 *       with the first item in the list being of type 
 *       <tt>uk.org.ogsadai.tuple.TupleMetadata</tt>.</li>
 * </ul>
 * The activity outputs the same tuples as it receives except all the column
 * values in the column specified by the 'columnName' input are converted to
 * upper case.
 *
 * @author The OGSA-DAI Project Team
 */
public class TupleProcessingActivity extends MatchedIterativeActivity
{
    /** Copyright notice */
    private static final String COPYRIGHT_NOTICE = 
        "Copyright (c) The University of Edinburgh,  2007.";

    /** Block writer used to write the activity's result output */
    private BlockWriter mResultBlockWriter;

    /**
     * Provides the input details.
     */
    protected ActivityInput[] getIterationInputs()
    {
        return new ActivityInput[] {
            new TypedActivityInput("columnName", String.class),
            new TupleListActivityInput("tuples")};
    }
 
    /**
     * Pre-processing.
     */
    protected void preprocess() throws ActivityUserException,
        ActivityProcessingException, ActivityTerminatedException
    {
        validateOutput("result");
        mResultBlockWriter = getOutput("result");
    }

    /**
     * Process an iteration.
     */
    protected void processIteration(Object[] iterationData)
        throws ActivityProcessingException, ActivityTerminatedException,
        ActivityUserException
    {
        try
        {
            String columnName = (String) iterationData[0];
            TupleListIterator tupleListIterator = 
                (TupleListIterator) iterationData[1];
    
            // Get the tuple metadata - this is held in a metadata wrapper
            MetadataWrapper metadataWrapper = 
                tupleListIterator.getMetadataWrapper();
            TupleMetadata tupleMetadata = 
                (TupleMetadata) metadataWrapper.getMetadata();
            
            // Find details of the column we wish to change
            int columnIndex;
            try
            {
                columnIndex = 
                    tupleMetadata.getColumnMetadataPosition(columnName);
            }
            catch(ColumnNotFoundException e)
            {
                throw new ActivityUserException(e);
            }
    
            // Write the output list begin and the metadata
            mResultBlockWriter.write(ControlBlock.LIST_BEGIN);
            mResultBlockWriter.write(metadataWrapper);
            
            // Process the tuples
            Tuple tuple;
            while( (tuple = (Tuple) tupleListIterator.nextValue()) != null)
            {
                int columnCount = tupleMetadata.getColumnCount();
                
                ArrayList elements = new ArrayList(columnCount);
                for (int i=0; i<columnCount; ++i)
                {
                    Object columnValue = tuple.getObject(i);
                    if (i==columnIndex)
                    {
                        columnValue = columnValue.toString().toUpperCase();
                    }
                    elements.add(i, columnValue);
                }
                Tuple newTuple = new SimpleTuple(elements);
                
                mResultBlockWriter.write(newTuple);
            }
            
            // Write the list end marker
            mResultBlockWriter.write(ControlBlock.LIST_END);
        }
        catch (PipeClosedException e)
        {
            // Consumer does not want any more data, just stop.
        }
        catch (PipeIOException e)
        {
            throw new ActivityPipeProcessingException(e);
        }
        catch (PipeTerminatedException e)
        {
            throw new ActivityTerminatedException();
        }
    }

    /**
     * Post-processing.
     */
    protected void postprocess() throws ActivityUserException,
        ActivityProcessingException, ActivityTerminatedException
    {
        // No post-processing
    }
}

Note that the above code does not handle the error condition that occurs if the column being converted to upper case is not normally stored in the tuple as a String. When constructing the output tuple we always use a String object for the column value of the converted column. This may not correspond with the metadata for that column as we just output the metadata unaltered. object then

29.2. Developing the client-side activity

The client-side implementation of the Tuple Processing activity very similar to the client-side implementations seen in the earlier examples. In many ways it is even easier in this example because Tuple objects are not one of the primitive objects that OGSA-DAI allows as literal inputs or values that can be written to the request status. There is therefore no need for an addTuples method nor hasNextResult and nextResult methods.

The complete client-toolkit activity implementation is:

// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.

package uk.org.ogsadai.tutorial.activity.client;

import uk.org.ogsadai.activity.ActivityName;
import uk.org.ogsadai.client.toolkit.Activity;
import uk.org.ogsadai.client.toolkit.ActivityOutput;
import uk.org.ogsadai.client.toolkit.SingleActivityOutput;
import uk.org.ogsadai.client.toolkit.activity.ActivityInput;
import uk.org.ogsadai.client.toolkit.activity.BaseActivity;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityInput;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityOutput;
import uk.org.ogsadai.client.toolkit.exception.ActivityIOIllegalStateException;
import uk.org.ogsadai.data.StringData;

/**
 * Client toolkit activity used to call the TupleProcessing activity.
 *
 * @author The OGSA-DAI Project Team
 */
public class TupleProcessing extends BaseActivity implements Activity
{
    /** Copyright notice */
    private static final String COPYRIGHT_NOTICE = 
        "Copyright (c) The University of Edinburgh,  2007.";

    /** Default activity name */
    public final static ActivityName DEFAULT_ACTIVITY_NAME = 
        new ActivityName("uk.org.ogsadai.tutorial.TupleProcessing");

    /** The columnName input */
    private ActivityInput mColumnNameInput;

    /** The tuples input */
    private ActivityInput mTuplesInput;

    /** The activity result output */
    private ActivityOutput mResultOutput;

    /**
     * Constructor.
     */
    public TupleProcessing()
    {
        super(DEFAULT_ACTIVITY_NAME);
        mColumnNameInput = new SimpleActivityInput("columnName");
        mTuplesInput = new SimpleActivityInput("tuples");
        mResultOutput = new SimpleActivityOutput("result");
    }
    
    /**
     * Adds an value to the columnName input.
     * 
     * @param columnName the columnName.
     */
    public void addColumnName(String columnName)
    {
        mColumnNameInput.add(new StringData(columnName));
    }
    
    /**
     * Connects the columnName input to the given output.
     * 
     * @param output output to connect to.
     */
    public void connectColumnNameInput(SingleActivityOutput output)
    {
        mColumnNameInput.connect(output);
    }

    /**
     * Connects the tuples input to the given output.
     * 
     * @param output output to connect to.
     */
    public void connectTuplesInput(SingleActivityOutput output)
    {
        mTuplesInput.connect(output);
    }

    
    /**
     * Gets the result output so that it can be connected to the input of other
     * activities.
     * 
     * @return the activity output.
     */
    public SingleActivityOutput getResultOutput()
    {
        return mResultOutput.getSingleActivityOutputs()[0];
    }
    
    
    /**
     * Gets the activity inputs.
     */
    protected ActivityInput[] getInputs()
    {
        return new ActivityInput[]{mColumnNameInput, mTuplesInput};
    }

    /**
     * Gets the activity outputs.
     */
    protected ActivityOutput[] getOutputs()
    {
        return new ActivityOutput[]{mResultOutput};
    }

    /**
     * Validates the data of the inputs and outputs.
     */
    protected void validateIOState() throws ActivityIOIllegalStateException
    {
        // No further validation to do
    }
}

29.3. Developing the client application

An example client application that uses this activity is shown here. You may have to change the base services URL in the main method depending on your server. By default this is:

  • OGSA-DAI Axis Default: http://localhost:8080/dai/services/

// Copyright (c) The University of Edinburgh, 2007.
// See OGSA-DAI-Licence.txt for licencing information.

package uk.org.ogsadai.tutorial.activity.client.apps;

import java.net.URL;
import java.sql.ResultSet;

import uk.org.ogsadai.client.toolkit.DataRequestExecutionResource;
import uk.org.ogsadai.client.toolkit.PipelineWorkflow;
import uk.org.ogsadai.client.toolkit.RequestExecutionType;
import uk.org.ogsadai.client.toolkit.ServerProxy;
import uk.org.ogsadai.client.toolkit.activities.delivery.DeliverToRequestStatus;
import uk.org.ogsadai.client.toolkit.activities.sql.SQLQuery;
import uk.org.ogsadai.client.toolkit.activities.transform.TupleToWebRowSetCharArrays;
import uk.org.ogsadai.client.toolkit.exception.RequestExecutionException;
import uk.org.ogsadai.resource.ResourceID;
import uk.org.ogsadai.tutorial.activity.client.TupleProcessing;

/**
 * Application for the TupleProcessing activity tutorial.
 *
 * @author The OGSA-DAI Project Team
 */
public class TupleProcessingApp
{
    /** Copyright notice */
    private static final String COPYRIGHT_NOTICE = 
        "Copyright (c) The University of Edinburgh,  2007.";

    /**
     * Main method.
     * 
     * @param args command line arguments.
     * 
     * @throws Exception if an unexpected error occurs
     */
    public static void main(String[] args) throws Exception
    {
        URL serverBaseUrl = new URL("http://localhost:8080/wsrf/services/dai/");
        ResourceID drerId = new ResourceID("DataRequestExecutionResource");
        
        ServerProxy serverProxy = new ServerProxy();
        serverProxy.setDefaultBaseServicesURL(serverBaseUrl);
        
        DataRequestExecutionResource drer = 
            serverProxy.getDataRequestExecutionResource(drerId);
        
        // Create the activities
        SQLQuery sqlQuery = new SQLQuery();
        sqlQuery.setResourceID("JDBCResource");
        sqlQuery.addExpression(
            "select id, name, address from littleblackbook where id < 10");
        TupleProcessing tupleProcessing = new TupleProcessing();
        tupleProcessing.addColumnName("name");
        tupleProcessing.connectTuplesInput(sqlQuery.getDataOutput());
        TupleToWebRowSetCharArrays tupleToWebRowSet = 
            new TupleToWebRowSetCharArrays();
        tupleToWebRowSet.connectDataInput(tupleProcessing.getResultOutput());
        DeliverToRequestStatus deliverToRequestStatus = 
            new DeliverToRequestStatus();
        deliverToRequestStatus.connectInput(tupleToWebRowSet.getResultOutput());

        // Create the workflow
        PipelineWorkflow pipeline = new PipelineWorkflow();
        pipeline.add(sqlQuery);
        pipeline.add(tupleProcessing);
        pipeline.add(tupleToWebRowSet);
        pipeline.add(deliverToRequestStatus);
        
        // Execute the workflow
        try
        {
            drer.execute(pipeline, RequestExecutionType.SYNCHRONOUS);
        }
        catch( RequestExecutionException e)
        {
            System.out.println("There was an error executing the workflow");
            System.out.println(e.getRequestResource().getRequestStatus());
            throw e;
        }
        
        // Get the result and display it
        while(tupleToWebRowSet.hasNextResult())
        {
            ResultSet rs = tupleToWebRowSet.nextResultAsResultSet();
            
            while(rs.next())
            {
                System.out.print(rs.getInt(1));
                System.out.print("\t");
                System.out.print(rs.getString(2));
                System.out.print("\t");
                System.out.println(rs.getString(3));
            }
        }
    }

Note that you may need to alter the value used for the server base URL to correspond to your server installation. You may also need to alter the resource ID of the relational resource the SQLQuery activity is targeted at. When the relational resource exposes the the OGSA-DAI example littleblackbook database the output from this application is:

1	ALLY ANTONIOLETTI	826 Hume Crescent, Southampton
2	AMY ATKINSON	583 Atkinson Drive, Southampton
3	ANDREW BORLEY	354 Jackson Road, Edinburgh
4	CHARAKA CHUE HONG	750 Pearson Crescent, Southampton
5	DAVE HARDMAN	079 Borley Gardens, Winchester
6	GEORGE HICKEN	398 Magowan Street, Winchester
7	JAMES HUME	801 Laws Gardens, Edinburgh
8	MALCOLM JACKSON	743 Krause Lane, Edinburgh
9	MARIO KRAUSE	026 Atkinson Gardens, Winchester

To get this client to successfully run you must install the server-side activity on your server. To do this compile your server side activity to a jar, for example, TupleProcessing.jar then follow the instructions at Section 16.1.8, “Deploying an activity” and Section 16.1.9, “Extending the supported activities of a resource”. When following these instructions use the following values:

  • dai.activity.id: uk.org.ogsadai.tutorial.TupleProcessing
  • dai.activity.name: uk.org.ogsadai.tutorial.TupleProcessing
  • dai.activity.class: uk.org.ogsadai.tutorials.activity.TupleProcessingActivity
  • dai.activity.description: "Tuple Processing activity"

The dai.activity.id and dai.activity.name values must match the default activity name used in the constructor of the client-side activity implementation class TupleProcessing. The dai.activity.class value must be the full class name of the server-side activity class. The dai.activity.description can be any suitable textual description.