Chapter 26. Hello name example activity

26.1. Developing the server-side activity
26.2. Developing the client-side activity
26.3. Developing the client application

This example activity has one input and one output. The input is called input and expects to receive objects of type java.lang.String. The output is called output. For each input string we receive we will write a String to the output that is the input string preceded by "Hello ".

26.1. Developing the server-side activity

We will implement the server-side activity in two different ways. Firstly, we will implement the activity by extending the ActivityBase base class as we did in the Hello World example, then we will implement the activity again this time extending the MatchedIterativeActivity base class.

When we extend from the ActivityBase base class the implementation is very similar to that of the Hello World activity. We need to add a new member variable to store the BlockReader we will use to read blocks from the input:

private BlockReader mInputBlockReader;

Now we need to validate that we have the required input and instantiate this member variable. This is done inside the process method:

validateInput("input");
mInputBlockReader = getInput("input");

We validate the output and obtain a corresponding BlockWriter in exactly the same way as we did in the Hello World example.

We can now read blocks from the input using the BlockReader interface's read method. This method has the following signature:

/**
 * Reads a block of data from the pipe. This operation blocks until a block
 * of data is available or the pipe is closed.
 * 
 * @throws PipeIOException
 *             if a problem occurs preventing a data block from being read
 *             from the pipe
 * @throws PipeTerminatedException
 *             if the pipe read operation is interrupted indicating that the
 *             request containing the pipe is being terminated
 * @return the data block or ControlBlock.NO_MORE_DATA if the
 *         pipe has been closed and there is no more data to read
 * @throws DataError 
 *             if the producer has signaled and error and the last 
 *             available block has been read by the consumer.
 */
 public Object read() 
     throws PipeIOException, PipeTerminatedException, DataError;

As described in the comment this method will block until the pipe has data to be read. The method will return a ControlBlock.NO_MORE_DATA object if there is no more data to be read. The main processing loop of the process method reads from the input and writes to the output until there is no more data to process. The code is:

while( (block = mInputBlockReader.read()) != ControlBlock.NO_MORE_DATA)
{
    if (block instanceof String)
    {
        String name = (String) block;
        mOutputBlockWriter.write("Hello " + name);
    }
    else
    {
        // User input error, this activity only supports Strings.
        throw new InvalidInputValueException(
            "input",            // input name
            String.class,       // expected class 
            block.getClass());  // actual class
    }
}

Note that the activity only expects to receive objects of type String in the input. If the block is not of type String an exception is thrown.

The whole server-side activity implementation is:

// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.

package uk.org.ogsadai.tutorials.activity;

import uk.org.ogsadai.activity.ActivityBase;
import uk.org.ogsadai.activity.ActivityProcessingException;
import uk.org.ogsadai.activity.ActivityTerminatedException;
import uk.org.ogsadai.activity.ActivityUserException;
import uk.org.ogsadai.activity.io.ActivityPipeProcessingException;
import uk.org.ogsadai.activity.io.BlockReader;
import uk.org.ogsadai.activity.io.BlockWriter;
import uk.org.ogsadai.activity.io.ControlBlock;
import uk.org.ogsadai.activity.io.InvalidInputValueException;
import uk.org.ogsadai.activity.io.PipeClosedException;
import uk.org.ogsadai.activity.io.PipeIOException;
import uk.org.ogsadai.activity.io.PipeTerminatedException;

/**
 * Simple activity that receives names and outputs "Hello [name]" for each
 * name received.
 *
 * @author The OGSA-DAI Project Team
 */
public class HelloNameActivity extends ActivityBase
{
    /** Block reader using to read the activity's input */
    private BlockReader mInputBlockReader;
    
    /** Block writer used to write the activity's output */
    private BlockWriter mOutputBlockWriter;
    
    /**
     * Runs the activity.
     */
    public void process() 
        throws ActivityUserException,
               ActivityProcessingException, 
               ActivityTerminatedException
    {
        // Validate inputs and outputs
        validateInput("input");
        validateOutput("output");
        
        // Get the inputs and outputs
        mInputBlockReader = getInput("input");
        mOutputBlockWriter = getOutput("output");
        
        try
        {   
            Object block;
            while( (block = mInputBlockReader.read()) != 
                      ControlBlock.NO_MORE_DATA)
            {
                if (block instanceof String)
                {
                    String name = (String) block;
                    mOutputBlockWriter.write("Hello " + name);
                }
                else
                {
                    // User input error, this activity only supports Strings.
                    throw new InvalidInputValueException(
                        "input",            // input name
                        String.class,       // expected class 
                        block.getClass());  // actual class
                }
            }
        }
        catch (PipeClosedException e)
        {
            // Consumer does not want any more data, just stop.
        }
        catch (PipeIOException e)
        {
            throw new ActivityPipeProcessingException(e);
        }
        catch (PipeTerminatedException e)
        {
            throw new ActivityTerminatedException();
        }
    }
}

Implementing the loop that reads the input and throws the errors when the inputs are of the wrong type can be a repetitive task. The MatchedIterativeActivity base class can be used instead of the ActivityBase base class to remove some of this burden from the programmer. When we extend from this class we have to implement four methods. The first method is called getIterationInputs and has the following signature:

/**
 * Gets the iteration inputs.
 * 
 * @return an array of ActivityInput objects
 */
protected abstract ActivityInput[] getIterationInputs();

This method is used to tell the base calls about the activity's inputs. In this example we have a single input called input that expects to receive data of type String. We implement this method as:

/**
 * Returns the details about the inputs.
 */
protected ActivityInput[] getIterationInputs()
{
    return new ActivityInput[] {
        new TypedActivityInput("input", String.class)};
}

There are a variety of different concrete classes that implement the ActivityInput interface. You should choose one appropriate to the type of input expected. Here we use a TypedActivityInput so that we can specify that all inputs should be of type String. The base class will therefore implement the appropriate error handling functionality if the blocks received on this input are not of type String.

The other three methods we have to implement are called preprocess, processIteration and postprocess. The MatchedIterativeActivity base class's process loop implements the following algorithm:

preprocess();
WHILE more processing to do
  processIteration(Object[] iterationInputs);
END-WHILE
postprocess();
cleanUp();

The preprocess method is called when the activity is started. The base class reads the input values and the processInteration method is called once for each input value. Finally when there is no more data the postprocess method is called. An empty implementation of the cleanUp method is provided in the base class and can be overridden if desired. The difference between postprocess and cleanUp is that postprocess will only be called if the previous stages have executed successfully but cleanUp will always be called even if earlier stages have thrown an exception.

For our activity all we have to do in the preprocess method is validate the output and populate the BlockWriter member variable used to write the output:

protected void preprocess() throws ActivityUserException,
    ActivityProcessingException, ActivityTerminatedException
{
    validateOutput("output");
    mOutputBlockWriter = getOutput("output");
}

In the processIteration method we simply need to get the input, process it and write to the output:

protected void processIteration(Object[] iterationData)
    throws ActivityProcessingException, ActivityTerminatedException,
    ActivityUserException
{
    try
    {
        String name = (String) iterationData[0];
        mOutputBlockWriter.write("Hello " + name);
    }
    catch (PipeClosedException e)
    {
        // Consumer does not want any more data, just stop.
    }
    catch (PipeIOException e)
    {
        throw new ActivityPipeProcessingException(e);
    }
    catch (PipeTerminatedException e)
    {
        throw new ActivityTerminatedException();
    }
}

Note how the input value was obtained. The parameter to the method is an object array. Each object in this array corresponds to an input specified in the ActivityInput array returned from the getIterationInputs method. In that method we specified that this input should be of type String so it is safe to obtain it and cast it to a String with the line:

  String name = (String) iterationData[0];

The rest of this method is simply error handling identical to that used in the Hello World example.

Finally we have to write a postprocess method. As we have no post processing to do we can write an empty method:

/**
 * Post-processing.
 */
protected void postprocess() throws ActivityUserException,
    ActivityProcessingException, ActivityTerminatedException
{
    // No post-processing
}

The full implementation of the Hello Name activity implemented by extending the MatchedIterativeActivity base class is:

// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.

package uk.org.ogsadai.tutorials.activity;

import uk.org.ogsadai.activity.ActivityProcessingException;
import uk.org.ogsadai.activity.ActivityTerminatedException;
import uk.org.ogsadai.activity.ActivityUserException;
import uk.org.ogsadai.activity.MatchedIterativeActivity;
import uk.org.ogsadai.activity.io.ActivityInput;
import uk.org.ogsadai.activity.io.ActivityPipeProcessingException;
import uk.org.ogsadai.activity.io.BlockWriter;
import uk.org.ogsadai.activity.io.PipeClosedException;
import uk.org.ogsadai.activity.io.PipeIOException;
import uk.org.ogsadai.activity.io.PipeTerminatedException;
import uk.org.ogsadai.activity.io.TypedActivityInput;

/**
 * Simple activity that receives names and outputs "Hello [name]" for each
 * name received.  In this case the activity is implemented by extending
 * MatchedIterativeActivity.
 *
 * @author The OGSA-DAI Project Team
 */
public class HelloName extends MatchedIterativeActivity
{
    /** Copyright notice */
    private static final String COPYRIGHT_NOTICE = 
        "Copyright (c) The University of Edinburgh,  2007.";

    /** Block writer used to write the activity's output */
    private BlockWriter mOutputBlockWriter;

    /**
     * Returns the details about the inputs.
     */
    protected ActivityInput[] getIterationInputs()
    {
        return new ActivityInput[] {
            new TypedActivityInput("input", String.class)};
    }

    /**
     * Pre-processing.
     */
    protected void preprocess() throws ActivityUserException,
        ActivityProcessingException, ActivityTerminatedException
    {
        validateOutput("output");
        mOutputBlockWriter = getOutput("output");
    }

    /**
     * Process a single iteration.
     */
    protected void processIteration(Object[] iterationData)
        throws ActivityProcessingException, ActivityTerminatedException,
        ActivityUserException
    {
        try
        {
            String name = (String) iterationData[0];
            mOutputBlockWriter.write("Hello " + name);
        }
        catch (PipeClosedException e)
        {
            // Consumer does not want any more data, just stop.
        }
        catch (PipeIOException e)
        {
            throw new ActivityPipeProcessingException(e);
        }
        catch (PipeTerminatedException e)
        {
            throw new ActivityTerminatedException();
        }
    }

    /**
     * Post-processing.
     */
    protected void postprocess() throws ActivityUserException,
        ActivityProcessingException, ActivityTerminatedException
    {
        // No post-processing
    }
}

As can be seen from these examples the MatchedIterativeActivity base class can make the development of activities that have inputs a little bit simpler than extending from the ActivityBase base class.

26.2. Developing the client-side activity

Having written a server-side activity we should now write a client-side activity class that allows Java applications to add the activity to workflows. Following the naming convention our client-side activity class should be called HelloName and should inherit from: uk.org.ogsadai.client.toolkit.activity.BaseActivity

The implementation is very similar to that of the Hello World client-side activity. The old difference is that we now have to handle the addition of a input. We need to declare a member variable for the input:

private ActivityInput mInput;

We now need to instantiate this input, along with the output, in the constructor:

public HelloName()
{
    super("uk.org.ogsadai.tutorial.HelloName");
    mInput = new SimpleActivityInput("input");
    mOutput = new SimpleActivityOutput("output");
}

Next we must return this input from the getInputs method:

protected ActivityInput[] getInputs()
{
    return new ActivityInput[]{mInput};
}

To allow client application developers to connect the input to the outputs of other activities we provide the following method:

/**
 * Connects the input to the given output.
 * 
 * @param output output to connect to.
 */
public void connectInput(SingleActivityOutput output)
{
    mInput.connect(output);
}

All inputs should have a method such as this. The method is generally called connectXInput where X is the name of the input. But when there is only a single input called input this method can be called connectInput as it is here.

Finally, we provide a method that allows the client developer to add values to the input directly rather than connect it to an output. This method is:

/**
 * Adds an value to the input.
 * 
 * @param name name to add to the input.
 */
public void addInput(String name)
{
    mInput.add(new StringData(name));
}

The methods corresponding to outputs are all much the same as they were in the Hello World example.

The complete client-toolkit activity implementation is:

// Copyright (c) The University of Edinburgh, 2007.
//See OGSA-DAI-Licence.txt for licencing information.

package uk.org.ogsadai.tutorial.activity.client;

import uk.org.ogsadai.activity.ActivityName;
import uk.org.ogsadai.client.toolkit.Activity;
import uk.org.ogsadai.client.toolkit.ActivityOutput;
import uk.org.ogsadai.client.toolkit.SingleActivityOutput;
import uk.org.ogsadai.client.toolkit.activity.ActivityInput;
import uk.org.ogsadai.client.toolkit.activity.BaseActivity;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityInput;
import uk.org.ogsadai.client.toolkit.activity.SimpleActivityOutput;
import uk.org.ogsadai.client.toolkit.exception.ActivityIOIllegalStateException;
import uk.org.ogsadai.client.toolkit.exception.DataSourceUsageException;
import uk.org.ogsadai.client.toolkit.exception.DataStreamErrorException;
import uk.org.ogsadai.client.toolkit.exception.UnexpectedDataValueException;
import uk.org.ogsadai.data.StringData;

/**
 * Client toolkit activity used to call the HelloName activity.
 *
 * @author The OGSA-DAI Project Team
 */
public class HelloName extends BaseActivity implements Activity
{
    /** Copyright notice */
    private static final String COPYRIGHT_NOTICE = 
        "Copyright (c) The University of Edinburgh,  2007.";
    
    /** Default activity name */
    public final static ActivityName DEFAULT_ACTIVITY_NAME = 
        new ActivityName("uk.org.ogsadai.tutorial.HelloName");

    /** The activity input */
    private ActivityInput mInput;

    /** The activity output */
    private ActivityOutput mOutput;

    /**
     * Constructor.
     */
    public HelloName()
    {
        super(DEFAULT_ACTIVITY_NAME);
        mInput = new SimpleActivityInput("input");
        mOutput = new SimpleActivityOutput("output");
    }
    
    /**
     * Adds an value to the input.
     * 
     * @param name name to add to the input.
     */
    public void addInput(String name)
    {
        mInput.add(new StringData(name));
    }
    
    /**
     * Connects the input to the given output.
     * 
     * @param output output to connect to.
     */
    public void connectInput(SingleActivityOutput output)
    {
        mInput.connect(output);
    }

    /**
     * Gets the output so that it can be connected to the input of other
     * activities.
     * 
     * @return the activity output.
     */
    public SingleActivityOutput getOutput()
    {
        return mOutput.getSingleActivityOutputs()[0];
    }
    
    /**
     * Gets if the activity has a next output value.
     * 
     * @return true if there is another output value, false otherwise.
     *         
     * @throws DataStreamErrorException 
     *             if there is an error on the data stream.
     * @throws UnexpectedDataValueException
     *             if there is an unexpected data value on the data stream.
     * @throws DataSourceUsageException
     *             if there is an error reading from a data source.
     */
    public boolean hasNextOutput()
        throws DataStreamErrorException, 
               UnexpectedDataValueException, 
               DataSourceUsageException
    {
        return mOutput.getDataValueIterator().hasNext();
    }
    
    /**
     * Gets the next output value.
     * 
     * @return the next output value.
     *         
     * @throws DataStreamErrorException 
     *             if there is an error on the data stream.
     * @throws UnexpectedDataValueException
     *             if there is an unexpected data value on the data stream.
     * @throws DataSourceUsageException
     *             if there is an error reading from a data source.
     */
    public String nextOutput()
        throws DataStreamErrorException, 
               UnexpectedDataValueException, 
               DataSourceUsageException
    {
        return mOutput.getDataValueIterator().nextAsString();
    }
    
    /**
     * Gets the activity inputs.
     */
    protected ActivityInput[] getInputs()
    {
        return new ActivityInput[]{mInput};
    }

    /**
     * Gets the activity outputs.
     */
    protected ActivityOutput[] getOutputs()
    {
        return new ActivityOutput[]{mOutput};
    }

    /**
     * Validates the data of the inputs and outputs.
     */
    protected void validateIOState() throws ActivityIOIllegalStateException
    {
        // No further validation to do
    }
}

26.3. Developing the client application

An example client application that uses this activity is shown here. You may have to change the base services URL in the main method depending on your server. By default this is:

  • OGSA-DAI Axis Default: http://localhost:8080/dai/services/

// Copyright (c) The University of Edinburgh, 2007.
// See OGSA-DAI-Licence.txt for licencing information.

package uk.org.ogsadai.tutorial.activity.client.apps;

import java.net.URL;

import uk.org.ogsadai.client.toolkit.DataRequestExecutionResource;
import uk.org.ogsadai.client.toolkit.PipelineWorkflow;
import uk.org.ogsadai.client.toolkit.RequestExecutionType;
import uk.org.ogsadai.client.toolkit.ServerProxy;
import uk.org.ogsadai.client.toolkit.activities.delivery.DeliverToRequestStatus;
import uk.org.ogsadai.client.toolkit.exception.RequestExecutionException;
import uk.org.ogsadai.resource.ResourceID;
import uk.org.ogsadai.tutorial.activity.client.HelloName;

/**
 * Application for the HelloName activity tutorial.
 *
 * @author The OGSA-DAI Project Team
 */
public class HelloNameApp
{
    /** Copyright notice */
    private static final String COPYRIGHT_NOTICE = 
        "Copyright (c) The University of Edinburgh,  2007.";

    /**
     * Main method.
     * 
     * @param args command line arguments.
     * 
     * @throws Exception if an unexpected error occurs
     */
    public static void main(String[] args) throws Exception
    {
        URL serverBaseUrl = new URL("http://localhost:8080/wsrf/services/dai/");
        ResourceID drerId = new ResourceID("DataRequestExecutionResource");
        
        ServerProxy serverProxy = new ServerProxy();
        serverProxy.setDefaultBaseServicesURL(serverBaseUrl);
        
        DataRequestExecutionResource drer = 
            serverProxy.getDataRequestExecutionResource(drerId);
        
        // Create the activities
        HelloName helloName = new HelloName();
        DeliverToRequestStatus deliverToRequestStatus = 
            new DeliverToRequestStatus();
        
        // Add some names to the input to the HelloName activity
        helloName.addInput("Rod");
        helloName.addInput("Jane");
        helloName.addInput("Freddy");
        
        // Connect the output of HelloName to DeliverToRequestStatus
        deliverToRequestStatus.connectInput(helloName.getOutput());
        
        // Create the workflow
        PipelineWorkflow pipeline = new PipelineWorkflow();
        pipeline.add(helloName);
        pipeline.add(deliverToRequestStatus);
        
        // Excecute the workflow
        try
        {
            drer.execute(pipeline, RequestExecutionType.SYNCHRONOUS);
        }
        catch( RequestExecutionException e)
        {
            System.out.println("There was an error executing the workflow");
            System.out.println(e.getRequestResource().getRequestStatus());
            throw e;
        }
        
        // Get the result and display it
        while( helloName.hasNextOutput())
        {
            System.out.println(helloName.nextOutput());
        }
    }
}

Note that you may need to alter the value used for the server base URL to correspond to your server installation. The output from this client is:

Hello Rod
Hello Jane
Hello Freddy

To get this client to successfully run you must install the server-side activity on your server. To do this compile one of the two server-side activity classes to a jar, for example, HelloName.jar then follow the instructions at Section 16.1.8, “Deploying an activity” and Section 16.1.9, “Extending the supported activities of a resource”. When following these instructions use the following values:

  • dai.activity.id: uk.org.ogsadai.tutorial.HelloName
  • dai.activity.name: uk.org.ogsadai.tutorial.HelloName
  • dai.activity.class: uk.org.ogsadai.tutorials.activity.HelloNameActivity
  • dai.activity.description: "Hello Name activity"