Chapter 27. List reduction example activity

27.1. Developing the server-side activity
27.2. Developing the client-side activity
27.3. Developing the client application

This example activity, called List Reduction, processes data in a list reducing the list to a single value that is then output. The activity has two inputs. The first input is called initialValue and expects to receive objects of type Integer. The second input is called values and expects to receive lists of type Integer. There is one output called output. The activity reads a value from the initialValue input and a list of values from the values input and adds all the values in the list to the initial value. The result of this sum is written as an Integer to the output.

27.1. Developing the server-side activity

We implement the server-side activity by extending from the MatchedIterativeActivity base class. We must implement the getIterationInputs method to return details of the inputs to the activity. This is implemented as:

protected ActivityInput[] getIterationInputs()
{
    return new ActivityInput[] {
        new TypedActivityInput("initialValue", Integer.class),
        new TypedListActivityInput("values", Integer.class)};
}

We have used TypedActivityInput for the initialValue input and specified that blocks must be of type Integer. For the values input that expects to receive lists of Integer objects we use the TypedListActivityInput class to specify that we expect a list and provide Integer.class as a parameter value to specify that the objects inside the list should be of type Integer.

We validate the output and obtain a corresponding BlockWriter in exactly the same way as we did in the previous examples.

We now have to process this data inside the processIteration method. Ignoring exceptions for simplicity this is implemented as:

protected void processIteration(Object[] iterationData)
{
    Integer initialValue = (Integer) iterationData[0];
    ListIterator values = (ListIterator) iterationData[1];

    // Get the initial value
    int value = initialValue.intValue();

    // Process all the integers in the list
    Integer nextValue;
    while( (nextValue = (Integer) values.nextValue()) != null)
    {
        value += nextValue.intValue();
    }
    
    // Output the final value
    mOutputBlockWriter.write(new Integer(value));
}

Note that because we provided two items in the array returned by getIterationInputs we get two items in the array that is passed to processIteration. The first of these items is the initialValue input and the second is an iterator that gives access to the data in the list in the values input. The rest of the code simply loops through the list getting each value and doing the required calculation. Note that the end of the list is indicated by the nextValue method returning null.

It is important to note that the ListIterator object passed to the processIteration method in this example allows us to stream the values from the list. The base class does not read the whole list into memory and then pass an iterator to this data to processIteration. Instead, the base class simply reads the list begin marker and passes the ListIterator object to the processIteration method. This ListIterator object is implemented as a simple wrapper around a BlockReader. Calls to read data from ListIterator may therefore block in the same way that calls to read data from a BlockReader can block. These implementation details allow the activity to process lists containing millions of objects without ever having to store the whole list in memory.

Note that each iteration requires one value from the initialValue input and one whole list from the values input. What would happen if the initialValue input receives a different number of values from the number of lists received by the values input? These situations where inputs are unmatched are handled by the MatchedIterativeActivity base class. For a more details description of what it means for an activity to be matched see Section 24.2.1, “Some definitions”.

The complete implementation of the server-side List Reduction activity is:

// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.

package uk.org.ogsadai.tutorials.activity;

import uk.org.ogsadai.activity.ActivityProcessingException;
import uk.org.ogsadai.activity.ActivityTerminatedException;
import uk.org.ogsadai.activity.ActivityUserException;
import uk.org.ogsadai.activity.MatchedIterativeActivity;
import uk.org.ogsadai.activity.io.ActivityInput;
import uk.org.ogsadai.activity.io.ActivityPipeProcessingException;
import uk.org.ogsadai.activity.io.BlockWriter;
import uk.org.ogsadai.activity.io.ListIterator;
import uk.org.ogsadai.activity.io.PipeClosedException;
import uk.org.ogsadai.activity.io.PipeIOException;
import uk.org.ogsadai.activity.io.PipeTerminatedException;
import uk.org.ogsadai.activity.io.TypedActivityInput;
import uk.org.ogsadai.activity.io.TypedListActivityInput;

/**
 * Tutorial activity that reduces the data in a list to something that 
 * summarizes that data.
 * 
 * The activity has two inputs.  One called 'initialValue' is extends Integer
 * objects and one called 'values' that expects a list of Integer objects.
 * The output for each iteration is the initial value plus the value of each
 * of the integers in the list.
 *
 * @author The OGSA-DAI Project Team
 */
public class ListReductionActivity extends MatchedIterativeActivity
{
    /** Copyright notice */
    private static final String COPYRIGHT_NOTICE = 
        "Copyright (c) The University of Edinburgh,  2007.";

    /** Block writer used to write the activity's output */
    private BlockWriter mOutputBlockWriter;

    protected ActivityInput[] getIterationInputs()
    {
        return new ActivityInput[] {
            new TypedActivityInput("initialValue", Integer.class),
            new TypedListActivityInput("values", Integer.class)};
    }

    /**
     * Pre-processing.
     */
    protected void preprocess() throws ActivityUserException,
        ActivityProcessingException, ActivityTerminatedException
    {
        validateOutput("output");
        mOutputBlockWriter = getOutput("output");
    }

    /**
     * Process each iteration.
     */
    protected void processIteration(Object[] iterationData)
        throws ActivityProcessingException, ActivityTerminatedException,
        ActivityUserException
    {
        try
        {
            Integer initialValue = (Integer) iterationData[0];
            ListIterator values = (ListIterator) iterationData[1];
    
            // Get the initial value
            int value = initialValue.intValue();
    
            // Process all the integers in the list
            Integer nextValue;
            while( (nextValue = (Integer) values.nextValue()) != null)
            {
                value += nextValue.intValue();
            }
        
            // Output the final value
            mOutputBlockWriter.write(new Integer(value));
        }
        catch (PipeClosedException e)
        {
            // Consumer does not want any more data, just stop.
        }
        catch (PipeIOException e)
        {
            throw new ActivityPipeProcessingException(e);
        }
        catch (PipeTerminatedException e)
        {
            throw new ActivityTerminatedException();
        }
    }

    /**
     * No post-processing.
     */
    protected void postprocess() throws ActivityUserException,
        ActivityProcessingException, ActivityTerminatedException
    {
        // No post-processing
    }
}

27.2. Developing the client-side activity

The client-side activity implementation for the List Reduction is very similar to the client-side activity implementation for the Hello Name activity. The only method that requires explanation is addValues which is implemented as:

/**
 * Adds a list of values to the values input.
 * 
 * @param values the values to include in the list.
 */
public void addValues(int[] values)
{
    mValuesInput.add(ListBegin.VALUE);
    for (int i=0; i<values.length; ++i)
    {
        mValuesInput.add(new IntegerData(values[i]));
    }
    mValuesInput.add(ListEnd.VALUE);
}

This method adds data to the values input. The values input expects to receive lists. Lists are identified in OGSA-DAI using list begin and list end markers. We therefore must place these markers around the data to form the list. This method hides the use of these list markers from the client application developer.

The complete client-toolkit activity implementation is:

// Copyright (c) The University of Edinburgh, 2007.

//See OGSA-DAI-Licence.txt for licencing information.

package uk.org.ogsadai.tutorial.activity.client;

import uk.org.ogsadai.activity.ActivityName;
import uk.org.ogsadai.client.toolkit.Activity;
import uk.org.ogsadai.client.toolkit.ActivityOutput;
import uk.org.ogsadai.client.toolkit.SingleActivityOutput;
import uk.org.ogsadai.client.toolkit.activity.ActivityInput;
import uk.org.ogsadai.client.toolkit.activity.BaseActivity;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityInput;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityOutput;
import uk.org.ogsadai.client.toolkit.exception.ActivityIOIllegalStateException;
import uk.org.ogsadai.client.toolkit.exception.DataSourceUsageException;
import uk.org.ogsadai.client.toolkit.exception.DataStreamErrorException;
import uk.org.ogsadai.client.toolkit.exception.UnexpectedDataValueException;
import uk.org.ogsadai.data.IntegerData;
import uk.org.ogsadai.data.ListBegin;
import uk.org.ogsadai.data.ListEnd;

/**
 * Client toolkit activity used to call the ListReduction activity.
 *
 * @author The OGSA-DAI Project Team
 */
public class ListReduction extends BaseActivity implements Activity
{
    /** Copyright notice */
    private static final String COPYRIGHT_NOTICE = 
        "Copyright (c) The University of Edinburgh,  2007.";

    /** Default activity name */
    public final static ActivityName DEFAULT_ACTIVITY_NAME = 
        new ActivityName("uk.org.ogsadai.tutorial.ListReduction");

    /** The initial value */
    private ActivityInput mInitialValueInput;

    /** The values */
    private ActivityInput mValuesInput;

    /** The activity output */
    private ActivityOutput mOutput;

    /**
     * Constructor.
     */
    public ListReduction()
    {
        super(DEFAULT_ACTIVITY_NAME);
        mInitialValueInput = new SimpleActivityInput("initialValue");
        mValuesInput = new SimpleActivityInput("values");
        mOutput = new SimpleActivityOutput("output");
    }
    
    /**
     * Adds an value to the initialValue input.
     * 
     * @param initialValue the initial value.
     */
    public void addInitialValue(int initialValue)
    {
        mInitialValueInput.add(new IntegerData(initialValue));
    }
    
    /**
     * Connects the initialValue input to the given output.
     * 
     * @param output output to connect to.
     */
    public void connectInitialValueInput(SingleActivityOutput output)
    {
        mInitialValueInput.connect(output);
    }

    /**
     * Adds a list of values to the values input.
     * 
     * @param values the values to include in the list.
     */
    public void addValues(int[] values)
    {
        mValuesInput.add(ListBegin.VALUE);
        for (int i=0; i<values.length; ++i)
        {
            mValuesInput.add(new IntegerData(values[i]));
        }
        mValuesInput.add(ListEnd.VALUE);
    }
    
    /**
     * Connects the values input to the given output.
     * 
     * @param output output to connect to.
     */
    public void connectValuesInput(SingleActivityOutput output)
    {
        mValuesInput.connect(output);
    }

    
    /**
     * Gets the output so that it can be connected to the input of other
     * activities.
     * 
     * @return the activity output.
     */
    public SingleActivityOutput getOutput()
    {
        return mOutput.getSingleActivityOutputs()[0];
    }
    
    /**
     * Gets if the activity has a next output value.
     * 
     * @return <tt>true</tt> if there is another output value, <tt>false</tt>
     *         otherwise.
     *         
     * @throws DataStreamErrorException 
     *             if there is an error on the data stream.
     * @throws UnexpectedDataValueException
     *             if there is an unexpected data value on the data stream.
     * @throws DataSourceUsageException
     *             if there is an error reading from a data source.
     */
    public boolean hasNextOutput()
        throws DataStreamErrorException, 
               UnexpectedDataValueException, 
               DataSourceUsageException
    {
        return mOutput.getDataValueIterator().hasNext();
    }
    
    /**
     * Gets the next output value.
     * 
     * @return the next output value.
     *         
     * @throws DataStreamErrorException 
     *             if there is an error on the data stream.
     * @throws UnexpectedDataValueException
     *             if there is an unexpected data value on the data stream.
     * @throws DataSourceUsageException
     *             if there is an error reading from a data source.
     */
    public int nextOutput()
        throws DataStreamErrorException, 
               UnexpectedDataValueException, 
               DataSourceUsageException
    {
        return mOutput.getDataValueIterator().nextAsInt();
    }
    
    /**
     * Gets the activity inputs.
     */
    protected ActivityInput[] getInputs()
    {
        return new ActivityInput[]{mInitialValueInput, mValuesInput};
    }

    /**
     * Gets the activity outputs.
     */
    protected ActivityOutput[] getOutputs()
    {
        return new ActivityOutput[]{mOutput};
    }

    /**
     * Validates the data of the inputs and outputs.
     */
    protected void validateIOState() throws ActivityIOIllegalStateException
    {
        // No further validation to do
    }
    
}

27.3. Developing the client application

An example client application that uses this activity is shown here. You may have to change the base services URL in the main method depending on your server. By default this is:

  • OGSA-DAI GT Default: http://localhost:8080/wsrf/services/dai/

// Copyright (c) The University of Edinburgh, 2007.
// See OGSA-DAI-Licence.txt for licencing information.

package uk.org.ogsadai.tutorial.activity.client.apps;

import java.net.URL;

import uk.org.ogsadai.client.toolkit.DataRequestExecutionResource;
import uk.org.ogsadai.client.toolkit.PipelineWorkflow;
import uk.org.ogsadai.client.toolkit.RequestExecutionType;
import uk.org.ogsadai.client.toolkit.ServerProxy;
import uk.org.ogsadai.client.toolkit.activities.delivery.DeliverToRequestStatus;
import uk.org.ogsadai.client.toolkit.exception.RequestExecutionException;
import uk.org.ogsadai.resource.ResourceID;
import uk.org.ogsadai.tutorial.activity.client.ListReduction;

/**
 * Application for the ListReduction activity tutorial.
 *
 * @author The OGSA-DAI Project Team
 */
public class ListReductionApp
{
    /** Copyright notice */
    private static final String COPYRIGHT_NOTICE = 
        "Copyright (c) The University of Edinburgh,  2007.";

    /**
     * Main method.
     * 
     * @param args command line arguments.
     * 
     * @throws Exception if an unexpected error occurs
     */
    public static void main(String[] args) throws Exception
    {
        URL serverBaseUrl = new URL("http://localhost:8080/wsrf/services/dai/");
        ResourceID drerId = new ResourceID("DataRequestExecutionResource");
        
        ServerProxy serverProxy = new ServerProxy();
        serverProxy.setDefaultBaseServicesURL(serverBaseUrl);
        
        DataRequestExecutionResource drer = 
            serverProxy.getDataRequestExecutionResource(drerId);
        
        // Create the activities
        ListReduction listReduction = new ListReduction();
        DeliverToRequestStatus deliverToRequestStatus = 
            new DeliverToRequestStatus();
        
        // Add some values to the ListReduction activity inputs
        listReduction.addInitialValue(100);
        listReduction.addInitialValue(200);
        listReduction.addValues(
            new int[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
        listReduction.addValues(
            new int[]{ 1, 10, 100, 1000});
        
        // Connect the output of ListReduction to DeliverToRequestStatus
        deliverToRequestStatus.connectInput(listReduction.getOutput());
        
        // Create the workflow
        PipelineWorkflow pipeline = new PipelineWorkflow();
        pipeline.add(listReduction);
        pipeline.add(deliverToRequestStatus);
        
        // Excecute the workflow
        try
        {
            drer.execute(pipeline, RequestExecutionType.SYNCHRONOUS);
        }
        catch( RequestExecutionException e)
        {
            System.out.println("There was an error executing the workflow");
            System.out.println(e.getRequestResource().getRequestStatus());
            throw e;
        }
        
        // Get the result and display it
        while( listReduction.hasNextOutput())
        {
            System.out.println(listReduction.nextOutput());
        }
    }

}

Note that you may need to alter the value used for the server base URL to correspond to your server installation. The output from this client is:

155
1311

To get this client to successfully run you must install the server-side activity on your server. To do this compile your server side activity to a jar, for example, ListReduction.jar then follow the instructions at Section 16.1.8, “Deploying an activity” and Section 16.1.9, “Extending the supported activities of a resource”. When following these instructions use the following values:

  • dai.activity.id: uk.org.ogsadai.tutorial.ListReduction
  • dai.activity.name: uk.org.ogsadai.tutorial.ListReduction
  • dai.activity.class: uk.org.ogsadai.tutorials.activity.ListReductionActivity
  • dai.activity.description: "List Reduction activity"

The dai.activity.id and dai.activity.name values must match the default activity name used in the constructor of the client-side activity implementation class ListReduction. The dai.activity.class value must be the full class name of the server-side activity class. The dai.activity.description can be any suitable textual description.