Chapter 28. List processing example activity

28.1. Developing the server-side activity
28.2. Developing the client-side activity
28.3. Developing the client application

This example activity, called List Processing, takes a list as input and outputs the list with the contents slightly altered. The activity has two inputs. The first input is called greeting and expects to receive values of type String. The second input is called names and expects to receive lists of String objects. The output is called output. For each list received the activity will append the greeting string to each value in the list and output the new list.

28.1. Developing the server-side activity

The server-side activity implementation class inherits from the MatchedIterativeActivity base class and is very similar to the server side activity implementations in the previous examples. The only major difference is that this activity outputs a list so must output the list begin and list end markers. The processIteration method (without exceptions for simplicity) is:

protected void processIteration(Object[] iterationData)
{
    String greeting = (String) iterationData[0];
    ListIterator values = (ListIterator) iterationData[1];
    
    // Output the list begin marker
    mOutputBlockWriter.write(ControlBlock.LIST_BEGIN);
            
    // Process the contexts of the list
    String name;
    while( (name = (String) values.nextValue()) != null)
    {
        mOutputBlockWriter.write(greeting + " " + name);
    }
        
    // Output the list end marker
    mOutputBlockWriter.write(ControlBlock.LIST_END);
}

This is very similar to the processIteration method in the List Reduction example. Again a ListIterator is used to access the data in the list. In this implementation we write a list begin marker to the output before we start to process the list, we then write a value to the output for each object read from the list and finally we write a list end marker.

The complete implementation of the server-side List Processing activity is:

// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.

package uk.org.ogsadai.tutorials.activity;

import uk.org.ogsadai.activity.ActivityProcessingException;
import uk.org.ogsadai.activity.ActivityTerminatedException;
import uk.org.ogsadai.activity.ActivityUserException;
import uk.org.ogsadai.activity.MatchedIterativeActivity;
import uk.org.ogsadai.activity.io.ActivityInput;
import uk.org.ogsadai.activity.io.ActivityPipeProcessingException;
import uk.org.ogsadai.activity.io.BlockWriter;
import uk.org.ogsadai.activity.io.ControlBlock;
import uk.org.ogsadai.activity.io.ListIterator;
import uk.org.ogsadai.activity.io.PipeClosedException;
import uk.org.ogsadai.activity.io.PipeIOException;
import uk.org.ogsadai.activity.io.PipeTerminatedException;
import uk.org.ogsadai.activity.io.TypedActivityInput;
import uk.org.ogsadai.activity.io.TypedListActivityInput;

/**
 * Tutorial activity that processes the data in a list.
 * <p>
 * The activity has two inputs.  One called 'greeting' is takes <tt>String</tt>
 * objects and one called 'names' that takes a list of <tt>String</tt> objects.
 * The output (called 'output') is a list of <tt>String</tt> objects that
 * is similar to the input list expect that the <tt>String</tt> values are
 * now prefixed by the value of the 'greeting' input followed by a space.
 * <p>
 * For example, if the greeting input is <tt>"Hello"</tt> and the values in
 * the input 'names' list are <tt>"Rod"</tt>, <tt>"Jane"</tt> and 
 * <tt>"Freddy"</tt> then the values on the output list will be 
 * <tt>"Hello Rod"</tt>, <tt>"Hello Jane"</tt> and <tt>"Hello Freddy"</tt>.
 *
 * @author The OGSA-DAI Project Team
 */
public class ListProcessingActivity extends MatchedIterativeActivity
{
    /** Copyright notice */
    private static final String COPYRIGHT_NOTICE = 
        "Copyright (c) The University of Edinburgh,  2007.";

    /** Block writer used to write the activity's output */
    private BlockWriter mOutputBlockWriter;

    /**
     * Provides the input details.
     */
    protected ActivityInput[] getIterationInputs()
    {
        return new ActivityInput[] {
            new TypedActivityInput("greeting", String.class),
            new TypedListActivityInput("names", String.class)};
    }

    /**
     * Pre-processing.
     */
    protected void preprocess() throws ActivityUserException,
        ActivityProcessingException, ActivityTerminatedException
    {
        validateOutput("output");
        mOutputBlockWriter = getOutput("output");
    }

    /**
     * Process each iteration.
     */
    protected void processIteration(Object[] iterationData)
        throws ActivityProcessingException, ActivityTerminatedException,
        ActivityUserException
    {
        try
        {
            String greeting = (String) iterationData[0];
            ListIterator values = (ListIterator) iterationData[1];
    
            // Output the list begin marker
            mOutputBlockWriter.write(ControlBlock.LIST_BEGIN);
            
            // Process the contexts of the list
            String name;
            while( (name = (String) values.nextValue()) != null)
            {
                mOutputBlockWriter.write(greeting + " " + name);
            }
        
            // Output the list end marker
            mOutputBlockWriter.write(ControlBlock.LIST_END);
        }
        catch (PipeClosedException e)
        {
            // Consumer does not want any more data, just stop.
        }
        catch (PipeIOException e)
        {
            throw new ActivityPipeProcessingException(e);
        }
        catch (PipeTerminatedException e)
        {
            throw new ActivityTerminatedException();
        }
    }

    /**
     * Post-processing.
     */
    protected void postprocess() throws ActivityUserException,
        ActivityProcessingException, ActivityTerminatedException
    {
        // No post-processing
    }
}

28.2. Developing the client-side activity

The client-side implementation of the List Processing activity is much the same as the client-side implementation of the List Reduction activity. The only major difference is that lists are now written to the output rather than single values so the nextOuput method need to reflect this. The nextOutput method is implemented as:

 
/**
 * Gets the next output value.
 * 
 * @return a <tt>DataIterator</tt> than gives access to a list of 
 *         <tt>java.lang.String</tt> objects.
 *         
 * @throws DataStreamErrorException 
 *             if there is an error on the data stream.
 * @throws UnexpectedDataValueException
 *             if there is an unexpected data value on the data stream.
 * @throws DataSourceUsageException
 *             if there is an error reading from a data source.
 */
public DataIterator nextOutput()
    throws DataStreamErrorException, 
           UnexpectedDataValueException, 
           DataSourceUsageException
{   
    return new DataListIterator(
        mOutput.getDataValueIterator(),String.class);
}

This method return a DataIterator object that can be used to access the String objects in the list. Because we know the list should only contain String objects we can specify this in the second parameter to the DataListIterator constructor. The DataListIterator class will therefore handle the reporting of any errors if the objects received are not Strings.

The complete client-toolkit activity implementation is:

// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.

package uk.org.ogsadai.tutorial.activity.client;

import uk.org.ogsadai.activity.ActivityName;
import uk.org.ogsadai.client.toolkit.Activity;
import uk.org.ogsadai.client.toolkit.ActivityOutput;
import uk.org.ogsadai.client.toolkit.DataIterator;
import uk.org.ogsadai.client.toolkit.DataListIterator;
import uk.org.ogsadai.client.toolkit.SingleActivityOutput;
import uk.org.ogsadai.client.toolkit.activity.ActivityInput;
import uk.org.ogsadai.client.toolkit.activity.BaseActivity;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityInput;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityOutput;
import uk.org.ogsadai.client.toolkit.exception.ActivityIOIllegalStateException;
import uk.org.ogsadai.client.toolkit.exception.DataSourceUsageException;
import uk.org.ogsadai.client.toolkit.exception.DataStreamErrorException;
import uk.org.ogsadai.client.toolkit.exception.UnexpectedDataValueException;
import uk.org.ogsadai.data.ListBegin;
import uk.org.ogsadai.data.ListEnd;
import uk.org.ogsadai.data.StringData;

/**
 * Client toolkit activity used to call the ListProcessing activity.
 *
 * @author The OGSA-DAI Project Team
 */
public class ListProcessing extends BaseActivity implements Activity
{
    /** Copyright notice */
    private static final String COPYRIGHT_NOTICE = 
        "Copyright (c) The University of Edinburgh,  2007.";

    /** Default activity name */
    public final static ActivityName DEFAULT_ACTIVITY_NAME = 
        new ActivityName("uk.org.ogsadai.tutorial.ListProcessing");

    /** The greeting input */
    private ActivityInput mGreetingInput;

    /** The names input */
    private ActivityInput mNamesInput;

    /** The activity output */
    private ActivityOutput mOutput;

    /**
     * Constructor.
     */
    public ListProcessing()
    {
        super(DEFAULT_ACTIVITY_NAME);
        mGreetingInput = new SimpleActivityInput("greeting");
        mNamesInput = new SimpleActivityInput("names");
        mOutput = new SimpleActivityOutput("output");
    }
    
    /**
     * Adds an value to the greeting input.
     * 
     * @param greeting the greeting.
     */
    public void addGreeting(String greeting)
    {
        mGreetingInput.add(new StringData(greeting));
    }
    
    /**
     * Connects the greeting input to the given output.
     * 
     * @param output output to connect to.
     */
    public void connectGreetingInput(SingleActivityOutput output)
    {
        mGreetingInput.connect(output);
    }

    /**
     * Adds a list of names to the names input.
     * 
     * @param names the names to include in the list.
     */
    public void addNames(String[] names)
    {
        mNamesInput.add(ListBegin.VALUE);
        for (int i=0; i<names.length; ++i)
        {
            mNamesInput.add(new StringData(names[i]));
        }
        mNamesInput.add(ListEnd.VALUE);
    }
    
    /**
     * Connects the names input to the given output.
     * 
     * @param output output to connect to.
     */
    public void connectNamesInput(SingleActivityOutput output)
    {
        mNamesInput.connect(output);
    }

    
    /**
     * Gets the output so that it can be connected to the input of other
     * activities.
     * 
     * @return the activity output.
     */
    public SingleActivityOutput getOutput()
    {
        return mOutput.getSingleActivityOutputs()[0];
    }
    
    /**
     * Gets if the activity has a next output value.
     * 
     * @return <tt>true</tt> if there is another output value, <tt>false</tt>
     *         otherwise.
     *         
     * @throws DataStreamErrorException 
     *             if there is an error on the data stream.
     * @throws UnexpectedDataValueException
     *             if there is an unexpected data value on the data stream.
     * @throws DataSourceUsageException
     *             if there is an error reading from a data source.
     */
    public boolean hasNextOutput()
        throws DataStreamErrorException, 
               UnexpectedDataValueException, 
               DataSourceUsageException
    {
        return mOutput.getDataValueIterator().hasNext();
    }
    
    /**
     * Gets the next output value.
     * 
     * @return a <tt>DataIterator</tt> than gives access to a list of 
     *         <tt>java.lang.String</tt> objects.
     *         
     * @throws DataStreamErrorException 
     *             if there is an error on the data stream.
     * @throws UnexpectedDataValueException
     *             if there is an unexpected data value on the data stream.
     * @throws DataSourceUsageException
     *             if there is an error reading from a data source.
     */
    public DataIterator nextOutput()
        throws DataStreamErrorException, 
               UnexpectedDataValueException, 
               DataSourceUsageException
    {   
        return new DataListIterator(
            mOutput.getDataValueIterator(),String.class);
    }
    
    /**
     * Gets the activity inputs.
     */
    protected ActivityInput[] getInputs()
    {
        return new ActivityInput[]{mGreetingInput, mNamesInput};
    }

    /**
     * Gets the activity outputs.
     */
    protected ActivityOutput[] getOutputs()
    {
        return new ActivityOutput[]{mOutput};
    }

    /**
     * Validates the data of the inputs and outputs.
     */
    protected void validateIOState() throws ActivityIOIllegalStateException
    {
        // No further validation to do
    }
    
}

28.3. Developing the client application

An example client application that uses this activity is shown here. You may have to change the base services URL in the main method depending on your server. By default this is:

  • OGSA-DAI Axis Default: http://localhost:8080/dai/services/

// Copyright (c) The University of Edinburgh, 2007.
// See OGSA-DAI-Licence.txt for licencing information.

package uk.org.ogsadai.tutorial.activity.client.apps;

import java.net.URL;

import uk.org.ogsadai.client.toolkit.DataIterator;
import uk.org.ogsadai.client.toolkit.DataRequestExecutionResource;
import uk.org.ogsadai.client.toolkit.PipelineWorkflow;
import uk.org.ogsadai.client.toolkit.RequestExecutionType;
import uk.org.ogsadai.client.toolkit.ServerProxy;
import uk.org.ogsadai.client.toolkit.activities.delivery.DeliverToRequestStatus;
import uk.org.ogsadai.client.toolkit.exception.RequestExecutionException;
import uk.org.ogsadai.resource.ResourceID;
import uk.org.ogsadai.tutorial.activity.client.ListProcessing;

/**
 * Application for the List Processing activity tutorial.
 *
 * @author The OGSA-DAI Project Team
 */
public class ListProcessingApp
{
    /** Copyright notice */
    private static final String COPYRIGHT_NOTICE = 
        "Copyright (c) The University of Edinburgh,  2007.";

    /**
     * Main method.
     * 
     * @param args command line arguments.
     * 
     * @throws Exception if an unexpected error occurs
     */
    public static void main(String[] args) throws Exception
    {
        URL serverBaseUrl = new URL("http://localhost:8080/wsrf/services/dai/");
        ResourceID drerId = new ResourceID("DataRequestExecutionResource");
        
        ServerProxy serverProxy = new ServerProxy();
        serverProxy.setDefaultBaseServicesURL(serverBaseUrl);
        
        DataRequestExecutionResource drer = 
            serverProxy.getDataRequestExecutionResource(drerId);
        
        // Create the activities
        ListProcessing listProcessing = new ListProcessing();
        DeliverToRequestStatus deliverToRequestStatus = 
            new DeliverToRequestStatus();
        
        // Add some values to the ListProcessing activity inputs
        listProcessing.addGreeting("Hello");
        listProcessing.addGreeting("Hi");
        listProcessing.addNames(
            new String[]{ "Roderick", "Frederick" } );
        listProcessing.addNames(
            new String[]{ "Rod", "Jane", "Freddy" } );
        
        // Connect the output of ListProcessing to DeliverToRequestStatus
        deliverToRequestStatus.connectInput(listProcessing.getOutput());
        
        // Create the workflow
        PipelineWorkflow pipeline = new PipelineWorkflow();
        pipeline.add(listProcessing);
        pipeline.add(deliverToRequestStatus);
        
        // Excecute the workflow
        try
        {
            drer.execute(pipeline, RequestExecutionType.SYNCHRONOUS);
        }
        catch( RequestExecutionException e)
        {
            System.out.println("There was an error executing the workflow");
            System.out.println(e.getRequestResource().getRequestStatus());
            throw e;
        }
        
        // Get the result and display it
        while(listProcessing.hasNextOutput())
        {
            DataIterator dataIterator = listProcessing.nextOutput();
            System.out.println("list begin");
            while( dataIterator.hasNext())
            {
                System.out.println((String) dataIterator.next());
            }
            System.out.println("list end");
        }
    }
}

Note that you may need to alter the value used for the server base URL to correspond to your server installation. The output from this client application is:

list begin
Hello Roderick
Hello Frederick
list end
list begin
Hi Rod
Hi Jane
Hi Freddy
list end

To get this client to successfully run you must install the server-side activity on your server. To do this compile your server side activity to a jar, for example, ListProcessing.jar then follow the instructions at Section 16.1.8, “Deploying an activity” and Section 16.1.9, “Extending the supported activities of a resource”. When following these instructions use the following values:

  • dai.activity.id: uk.org.ogsadai.tutorial.ListProcessing
  • dai.activity.name: uk.org.ogsadai.tutorial.ListProcessing
  • dai.activity.class: uk.org.ogsadai.tutorials.activity.ListProcessing
  • dai.activity.description: "List Processing activity"

The dai.activity.id and dai.activity.name values must match the default activity name used in the constructor of the client-side activity implementation class ListProcessing. The dai.activity.class value must be the full class name of the server-side activity class. The dai.activity.description can be any suitable textual description.