In this chapter we provide examples of how to realize various data access and integration scenarios using the client toolkit as well as providing other useful information.
This section shows how to implement the scenario described in Section 7.3, “ Targetting an SQL query at multiple data resources using a resource group ” which executes an SQL query across a set of resources.
Before you can target an SQL query at multiple resources it is necessary to create a resource group. The CreateResourceGroup activity creates resource groups. It expects the IDs of the resources in the group as inputs.
For this example there are three resources we would like to group together as a resource group.
String id1 = "Resource1"; String id2 = "Resource2"; String id3 = "Resource3";
The IDs of the resources are placed into a string array and passed to the CreateResourceGroup activity as a parameter.
String resources = new String[]{ id1, id2, id3 };
CreateResourceGroup create = new CreateResourceGroup();
create.addResourceIdsAsArray(resources);
The following listing shows a request in which the output of the CreateResourceGroup activity, which is the resource ID of the newly created resource, is delivered to the client.
DeliverToRequestStatus deliverToRequestStatus1 = new DeliverToRequestStatus();
deliverToRequestStatus1.connectInput(create.getOutput());
PipelineWorkflow createWorkflow = new PipelineWorkflow();
createWorkflow.add(create);
createWorkflow.add(deliverToRequestStatus1);
Once the request has been executed the resource ID of the group is available from the CreateResourceGroup results:
drer.execute(createWorkflow, RequestExecutionType.SYNCHRONOUS); ResourceID groupID = create.nextResult();
This new resource can now be used as a target for an SQLBag query which executes the same SQL query against all the resources in the resource group, that is against resources Resource1, Resource2 and Resource3.
We create a new request with an SQLBag query. In the first step, a new instance of the SQLBag activity is populated. Its target resource is set to the new resource group created above. An SQL query expression is added to the parameters.
SQLBag query = new SQLBag();
query.setResourceID(groupID);
query.addExpression("SELECT * FROM littleblackbook WHERE id<3");
More than one SQL expression can be added with the method shown above in order to execute several SQL queries within the same request. Each of the queries is run across each of the resources in the group and produces a separate result set.
![]() | Warning |
|---|---|
The current implementation of the SQLBag activity has a timeout input. This input is now regarded as a bad idea and will be removed from the next release. If you are getting errors due to SQLBag timing out you can set the timeout to a larger value to prevent timeouts. For example: query.addTimeout(Integer.MAX_VALUE);
To further complicate matters there is a bug in the ListConcatenation activity used within SQLBag that will lead to an infinite loop if all the resources in the resource group produce an error when given the query expression. If you are planning to make extensive use of SQLBag before the next release of OGSA-DAI then please contact the OGSA-DAI team and we will be happy to advise you. |
In our example, the results of the query are streamed to the TupleToWebRowSetCharArrays activity to serialise them as XML and are then delivered to the request status.
TupleToWebRowSetCharArrays tupleToWebRowSet = new TupleToWebRowSetCharArrays();
tupleToWebRowSet.connectDataInput(query.getDataOutput());
DeliverToRequestStatus deliverToRequestStatus = new DeliverToRequestStatus();
deliverToRequestStatus.connectInput(tupleToWebRowSet.getResultOutput());
PipelineWorkflow pipeline = new PipelineWorkflow();
pipeline.add(query);
pipeline.add(tupleToWebRowSet);
pipeline.add(deliverToRequestStatus);
After executing the above request, in which the results are delivered back to
the client, the results can be retrieved from the TupleToWebRowSetCharArrays
activity.
For each SQL expression that was addedto the SQLBag activity there will be one
result set available. Each call to
TupleToWebRowSetCharArrays.nextResult()
returns a java.sql.ResultSet object providing
row-based access to the results.
drer.execute(pipeline, RequestExecutionType.SYNCHRONOUS);
while (tupleToWebRowSet.hasNextResult())
{
ResultSet rs = tupleToWebRowSet.nextResultAsResultSet();
// iterate through the rows of the result set
while (rs.next())
{
... // do something with the result values
}
}
Clients can use the normal Globus Toolkit security constants to
specify the type and level of security required between the client and
the server. The Resource interface
contains the addServerCommsProperty
method. This method can be used to set the security properties for
any resource.
To specify that the client must communicate with the server using GSI secure conversation with privacy (encryption), full delegation and host authorization the following code should be used (with appropriate changes to the URL if required).
import org.globus.axis.gsi.GSIConstants;
import org.globus.wsrf.impl.security.authentication.Constants;
import org.globus.wsrf.impl.security.authorization.HostAuthorization;
GTServer server = new GTServer();
server.setDefaultBaseServicesURL("http://my.server.org:8080/wsrf/services/dai/");
DataRequestExecutionResource drer =
server.getDataRequestExecutionResource(
new ResourceID("DataRequestExecutionResource"));
// Set security
drer.addServerCommsProperty(
Constants.GSI_SEC_CONV, Constants.ENCRYPTION);
drer.addServerCommsProperty(
GSIConstants.GSI_MODE, GSIConstants.GSI_MODE_FULL_DELEG);
drer.addServerCommsProperty(
Constants.AUTHORIZATION, HostAuthorization.getInstance());
If the user has a valid grid proxy certificate then the application will automatically use it if the Globus Toolkit client set up is configured correctly.
To specify that the client must communicate with the server using GSI secure message with integrity (signature) and host authorization the following code should be used (with appropriate changes to the URL and resource ID if required).
import org.globus.axis.gsi.GSIConstants;
import org.globus.wsrf.impl.security.authentication.Constants;
import org.globus.wsrf.impl.security.authorization.HostAuthorization;
GTServer server = new GTServer();
server.setDefaultBaseServicesURL("http://my.server.org:8080/wsrf/services/dai/");
DataRequestExecutionResource drer =
server.getDataRequestExecutionResource(
new ResourceID("DataRequestExecutionResource"));
// Set security
drer.addServerCommsProperty(
Constants.GSI_SEC_MSG, Constants.SIGNATURE);
drer.addServerCommsProperty(
Constants.AUTHORIZATION, HostAuthorization.getInstance());
To specify that the client must communicate with the server using GSI secure message with privacy (encryption) the following code should be used (with appropriate changes to the URL, resourceID and public key filename if required).
import java.security.cert.X509Certificate;
import javax.security.auth.Subject;
import org.globus.gsi.CertUtil;
import org.globus.wsrf.impl.security.authentication.Constants;
import org.globus.wsrf.impl.security.authentication.encryption.EncryptionCredentials;
GTServer server = new GTServer();
server.setDefaultBaseServicesURL("http://my.server.org:8080/wsrf/services/dai/");
DataRequestExecutionResource drer =
server.getDataRequestExecutionResource(
new ResourceID("DataRequestExecutionResource"));
// Set security
drer.addServerCommsProperty(
Constants.GSI_SEC_MSG, Constants.ENCRYPTION);
String publicKeyFilename = "/path/to/server/public/key/serverKey.pem";
Subject subject = new Subject();
X509Certificate serverCert =
CertUtil.loadCertificate(publicKeyFilename);
EncryptionCredentials encryptionCreds =
new EncryptionCredentials(new X509Certificate[] { serverCert });
subject.getPublicCredentials().add(encryptionCreds);
drer.addServerCommsProperty(Constants.PEER_SUBJECT, subject);
As can be seen from this example to use encryption with GSI secure message requires the client to provide the public key of the server so that the message can be encrypted using this key.
To specify that transport level security is to be used with integrity and host authorization the following code should be used (with appropriate changes to the URL and resourceID if required).
import org.globus.axis.gsi.GSIConstants;
import org.globus.wsrf.impl.security.authentication.Constants;
import org.globus.wsrf.impl.security.authorization.HostAuthorization;
import org.globus.axis.util.Util;
// Register transport level security
static
{
Util.registerTransport();
}
GTServer server = new GTServer();
server.setDefaultBaseServicesURL("https://my.server.org:8443/wsrf/services/dai/");
DataRequestExecutionResource drer =
server.getDataRequestExecutionResource(
new ResourceID("DataRequestExecutionResource"));
// Set security
drer.addServerCommsProperty(
Constants.GSI_TRANSPORT, Constants.SIGNATURE);
drer.addServerCommsProperty(
Constants.AUTHORIZATION, HostAuthorization.getInstance());
For privacy (encryption) the
Constants.ENCRYPTION constant should
be used instead of
Constants.SIGNATURE. Notice that
before using transport level security it is important to ensure that
Util.registerTransport() has been
called from a static block. It is
also essential to ensure the service URL specifies
https rather than
http.
The appropriate security settings must be applied to every
Resource object before use. For
example, if a data request execution resource returns a request
resource then the required security properties for communicating with
the request resource must be explicity set before the resource is
used. Child resources such as this do not inherit the security
properties of their parent.
The example below requests the recursive list of files in the
Phase1 directory within the file
system resource with resource ID
fileResourceID. The default
includePath value is assumed to be
set. The list is then printed.
// Build the workflow
PipelineWorkflow pipeline = new PipelineWorkflow();
// Build the activities and activity chain
ListDirectory listDirectory = new ListDirectory();
listDirectory.setResourceID(fileResourceID);
listDirectory.addDirectory("/Phase1");
listDirectory.addRecursive(true);
pipeline.add(listDirectory);
DeliverToRequestStatus deliverToRequestStatus = new DeliverToRequestStatus();
deliverToRequestStatus.connectInput(listDirectory.getDataOutput());
pipeline.add(deliverToRequestStatus);
RequestResource requestResource =
mDRER.execute(pipeline, RequestExecutionType.SYNCHRONOUS);
RequestStatus status = requestResource.getRequestStatus();
if (listDirectory.hasNextData()) {
DataIterator dataList = listDirectory.nextData();
while (dataList.hasNext()) {
String directoryEntry = (String) dataList.next();
System.out.println(directoryEntry);
}
}
The following example shows how to read whole file called
MachineData
// Build the workflow
PipelineWorkflow pipeline = new PipelineWorkflow();
// Build the activities and activity chain
ReadFromFile readFromFile = new ReadFromFile();
readFromFile.setResourceID(fileResourceID);
readFromFile.addFile("MachineData");
pipeline.add(readFromFile);
DeliverToRequestStatus deliverToRequestStatus = new DeliverToRequestStatus();
deliverToRequestStatus.connectInput(readFromFile.getDataOutput());
pipeline.add(deliverToRequestStatus);
RequestResource requestResource = mDRER.execute(pipeline,
RequestExecutionType.SYNCHRONOUS);
RequestStatus status = requestResource.getRequestStatus();
if (readFromFile.hasNextData()) {
DataValueInputStream inputStream =
(DataValueInputStream)readFromFile.getNextData();
int availableBytes = inputStream.available();
byte[] fileContent = new byte[availableBytes];
int numberOfBytesRead = inputStream.read(actualBytes);
while (numberOfBytesRead != -1) {
// Print out the file content
System.out.println(new String(fileContent));
// read more data from the stream
availableBytes = inputStream.available();
fileContent = new byte[availableBytes];
numberOfBytesRead = inputStream.read(nextBytes);
}
}
The following example shows how to execute an SQL query, then convert the produced list of tuples into CSV format and deliver this to a list of recipients via SMTP
Initially an SQLQuery activity is created which executes an SQL query
against a relational resource with resource ID, let us say,
MySQLResource.
String expression = "SELECT id, name, address, phone FROM littleblackbook where id>2 and id<12 order by id"; SQLQuery query = new SQLQuery(); query.setResourceID(MySQLResource); query.addExpression(expression);
Then the output of the SQLQuery activity is connected to the input of a TupleToCSV activity
TupleToCSV tupleToCSV = new TupleToCSV(); tupleToCSV.connectDataInput(query.getDataOutput());
Then, the this transformation activity is connected to a DeliverToSMTP activity. This activity has three additional parameters: the subject of the email, the sender and a list of recipients.
DeliverToSMTP deliverToSMTP = new DeliverToSMTP();
deliverToSMTP.connectDataInput(tupleToCSV.getResultOutput());
deliverToSMTP.addFrom("senderName@mySMTPserver.com");
deliverToSMTP.addSubject("Results from SQL query");
List to = new ArrayList();
to.add("recipientName1@anotherSMTPserver.com");
to.add("recipientName2@anotherSMTPserver.com");
deliverToSMTP.addTo(to.iterator());
The last step is to form a workflow based on these activities and execute it by submitting it to the OGSA-DAI server. Note that since there is nothing to be delivered back to the client there is no need for a DeliverToRequestStatus activity.
// Build the workflow
PipelineWorkflow pipeline = new PipelineWorkflow();
pipeline.add(query);
pipeline.add(tupleToCSV);
pipeline.add(deliverToSMTP);
RequestResource requestResource = mDRER.execute(pipeline,
RequestExecutionType.SYNCHRONOUS);
RequestStatus status = requestResource.getRequestStatus();
The following example is a variation of the previous one with the only difference being that the produced tuples from SQLQuery are transformed into WebRowSet format instead of CSV.
String expression = "SELECT id, name, address, phone FROM littleblackbook where id>2 and id<12 order by id";
SQLQuery query = new SQLQuery();
query.setResourceID(MySQLResource);
query.addExpression(expression);
TupleToWebRowSetCharArrays tupleToWebRowSet = new TupleToWebRowSetCharArrays();
tupleToWebRowSet.connectDataInput(query.getDataOutput());
DeliverToSMTP deliverToSMTP = new DeliverToSMTP();
deliverToSMTP.connectDataInput(tupleToWebRowSet.getResultOutput());
deliverToSMTP.addFrom("senderName@mySMTPserver.com");
deliverToSMTP.addSubject("Results from SQL query");
List to = new ArrayList();
to.add("recipientName1@anotherSMTPserver.com");
to.add("recipientName2@anotherSMTPserver.com");
deliverToSMTP.addTo(to.iterator());
// Build the workflow
PipelineWorkflow pipeline = new PipelineWorkflow();
pipeline.add(query);
pipeline.add(tupleToWebRowSet);
pipeline.add(deliverToSMTP);
RequestResource requestResource = mDRER.execute(pipeline,
RequestExecutionType.SYNCHRONOUS);
RequestStatus status = requestResource.getRequestStatus();
The following example is a variation of the previous one with the only difference being that the WebRowSet is delivered to an FTP server instead of via SMTP.
The data that DeliverToFTP activity has to deliver to the FTP server will be retrieved from the output of TupleToWebRowSetCharArrays activity. Furthermore, it has two additional parameters, the URL of the FTP server and the filename that the data will be written to.
DeliverToFTP deliverToFTP = new DeliverToFTP();
deliverToFTP.connectDataInput(tupleToWebRowSet.getResultOutput());
deliverToFTP.addFilename("slqResults.xml");
deliverToFTP.addHost("ftp://user1:pass@myftpserver.ed.ac.uk:21");
The following code implements the scenario:
String expression = "SELECT id, name, address, phone FROM littleblackbook where id>2 and id<12 order by id";
SQLQuery query = new SQLQuery();
query.setResourceID(MySQLResource);
query.addExpression(expression);
TupleToWebRowSetCharArrays tupleToWebRowSet = new TupleToWebRowSetCharArrays();
tupleToWebRowSet.connectDataInput(query.getDataOutput());
DeliverToFTP deliverToFTP = new DeliverToFTP();
deliverToFTP.connectDataInput(tupleToWebRowSet.getResultOutput());
deliverToFTP.addFilename("slqResults.xml");
deliverToFTP.addHost("user1:pass@myftpserver.ed.ac.uk:21");
// Build the workflow
PipelineWorkflow pipeline = new PipelineWorkflow();
pipeline.add(query);
pipeline.add(tupleToWebRowSet);
pipeline.add(deliverToFTP);
RequestResource requestResource = mDRER.execute(pipeline,
RequestExecutionType.SYNCHRONOUS);
RequestStatus status = requestResource.getRequestStatus();
This scenario shows how to execute an SQL query against a relational resource, transform into WebRowSet format and then populate a data source with this data.
Before you can use a data source it is necessary to create one. The CreateDataSource activity creates data sources.
The following listing shows a request in which the output of the CreateDataSource activity, which is the resource ID of the newly created data source, is delivered to the client. Once the request has been executed the resource ID of the data source is available from the CreateDataSource results.
CreateDataSource createDataSource = new CreateDataSource();
DeliverToRequestStatus deliverToRequestStatus = new DeliverToRequestStatus();
deliverToRequestStatus.connectInput(createDataSource.getResultOutput());
//Build the workflow
PipelineWorkflow createWorkflow = new PipelineWorkflow();
createWorkflow.add(createDataSource);
createWorkflow.add(deliverToRequestStatus);
RequestResource requestResource = mDRER.execute(createWorkflow,
RequestExecutionType.SYNCHRONOUS);
RequestStatus status = requestResource.getRequestStatus();
// Get the data source ID
ResourceID dataSourceID = createDataSource.nextResult();
This new resource can now be used to be populated with the transformed output of an SQLQuery execution against a relational resource.
String expression = "SELECT id, name, address, phone FROM littleblackbook where id>2 and id<12 order by id";
SQLQuery query = new SQLQuery();
query.setResourceID(MySQLResource);
query.addExpression(expression);
TupleToWebRowSetCharArrays transform = new TupleToWebRowSetCharArrays();
transform.connectDataInput(query.getDataOutput());
WriteToDataSource delivery = new WriteToDataSource();
delivery.setResourceID(dataSourceID);
delivery.connectInput(transform.getResultOutput());
PipelineWorkflow pipeline = new PipelineWorkflow();
pipeline.add(sQuery);
pipeline.add(transform);
pipeline.add(delivery);
RequestResource requestResource = mDRER.execute(pipeline,
RequestExecutionType.SYNCHRONOUS);
RequestStatus status = requestResource.getRequestStatus();
This data can then be pulled from the OGSA-DAI server via a data source service provided that the data source is still alive. Alternatively the client can use the data source in another workflow via use of the ObtainFromDataSource activity.
The following example shows how to read a file called
MachineData and then deliver its data
to a list of SMTP recipients and an FTP server. In order to duplicate
the output of ReadFromFile activity so that both DeliverToFTP and
DeliverToSMTP activities can consume it, the Tee activity is used
which clones its inputs.
// Build the activities and activity chain
ReadFromFile readFromFile = new ReadFromFile();
readFromFile.setResourceID(fileResourceID);
readFromFile.addFile("MachineData");
Tee tee = new Tee();
tee.connectInput(readFromFile.getDataOutput());
tee.setNumberOfOutputs(2);
DeliverToFTP deliverToFTP = new DeliverToFTP();
deliverToFTP.connectDataInput(tee.getOutput(0));
deliverToFTP.addFilename("slqResults.xml");
deliverToFTP.addHost("ftp://user1:pass@myftpserver.ed.ac.uk:21");
DeliverToSMTP deliverToSMTP = new DeliverToSMTP();
deliverToSMTP.connectDataInput(tee.getOutput(1));
deliverToSMTP.addFrom("senderName@mySMTPserver.com");
deliverToSMTP.addSubject("Results from SQL query");
List to = new ArrayList();
to.add("recipientName1@anotherSMTPserver.com");
to.add("recipientName2@anotherSMTPserver.com");
deliverToSMTP.addTo(to.iterator());
// Build the workflow
PipelineWorkflow pipeline = new PipelineWorkflow();
pipeline.add(readFromFile);
pipeline.add(tee);
pipeline.add(deliverToFTP);
pipeline.add(deliverToSMTP);
RequestResource requestResource = mDRER.execute(pipeline,
RequestExecutionType.SYNCHRONOUS);
RequestStatus status = requestResource.getRequestStatus();
}
This scenario explores the new dynamics of OGSA-DAI since it shows how the results of an SQLQuery against a relational resource can then be used to perform a bulk load against another resource that is exposed by the OGSA-DAI server.
Between these two activities some transformations need to be performed. Initially the output of SQLQuery activity is linked to the input of TupleToWebRowSetCharArrays activity. Then this output passes through an XSLTransform activity which will transform the WebRowSet into a representation similar to CSV based on a XSL file which is provided as a parameter.
String expression = "SELECT id, name, address, phone FROM littleblackbook where id>2 and id<12 order by id";
SQLQuery query = new SQLQuery();
query.setResourceID(MySQLResource);
query.addExpression(expression);
TupleToWebRowSetCharArrays tupleToWebRowSet = new TupleToWebRowSetCharArrays();
tupleToWebRowSet.connectDataInput(query.getDataOutput());
Reader XSLreader = new FileReader("bulkLoad.xsl");
XSLTransform xsl = new XSLTransform();
xsl.connectXMLInput(tupleToWebRowSet.getResultOutput());
xsl.addXSLT(XSLreader);
Followingly, the CSV is transformed into a list of tuples using CSVToTuple activity and this list of tuples will then be bulk loaded into a table of another relational resource.
CSVToTuple csvToTuples = new CSVToTuple(); csvToTuples.connectDataInput(xsl.getResultOutput()); csvToTuples.addHeaderIncluded(true); SQLBulkLoadTuple loadTuples = new SQLBulkLoadTuple(); loadTuples.setResourceID(PostgreSQLResource); loadTuples.connectDataInput(csvToTuples.getResultOutput()); loadTuples.addTableName(anotherlittleblackbook);
The final part of this scenario involves the execution of the request as well as getting the count of bulk loaded tuples through the results of SQLBulkLoad activity. To facilitate this we will need to use DeliverToRequestStatus activity.
DeliverToRequestStatus deliverToRequestStatus = new DeliverToRequestStatus();
deliverToRequestStatus.connectInput(loadTuples.getDataOutput());
// Build the workflow
PipelineWorkflow pipeline = new PipelineWorkflow();
pipeline.add(query);
pipeline.add(tupleToWebRowSet);
pipeline.add(xsl);
pipeline.add(csvToTuples);
pipeline.add(loadTuples);
pipeline.add(deliverToRequestStatus);
RequestResource requestResource = mDRER.execute(pipeline,
RequestExecutionType.SYNCHRONOUS);
RequestStatus status = requestResource.getRequestStatus();
System.out.println(loadTuples.nextResult());
This scenario has similar philosophy to the previous one which is to retrieve some data from a resource and then somehow insert it into another one. However in this case the initial query is against an XML resource instead of a relational one. Furthermore, the output of the XPath execution is also transformed and delivered to an FTP server.
It is worth noting that the data input of DeliverToFTP is a list of
char[] contrary to Section 7.8, “Inter-FTP server transfer” where it is a list of
byte[] - DeliverToFTP can handle both
binary and character data coming into its data input.
Another notable point is that an SQLParameterizedUpdate activity is
used that enables execution of any parameterised SQL query in contrary
to bulk load that was used on the previous scenario (Section 23.10, “Executing an SQL query, performing necessary transformations
and bulk loading another relational resource”). After the execution of
the request this activity returns a list of
int. Each of its elements provides
the update count of the SQL execution with the corresponding input
tuple being the parameter.
The following listing shows the implementation of this scenario.
XPathQuery xpathStatement = new XPathQuery();
xpathStatement.setResourceID(eXistResource);
String xpath = "/entry[@id=23]";
xpathStatement.addExpression(xpath);
Tee tee = new Tee();
tee.connectInput(xpathStatement.getDataOutput());
tee.setNumberOfOutputs(2);
//FTP delivery
Reader XSLTuplereader = new FileReader("Resource2HTML.xsl");
XSLTransform transform2HTML = new XSLTransform();
transform2HTML.addXSLT(XSLTuplereader);
transform2HTML.connectXMLInput(tee.getOutput(0));
DeliverToFTP dftp = new DeliverToFTP();
dftp.addFilename("test-files/results2.html");
dftp.connectDataInput(transform2HTML.getResultOutput());
dftp.addPassiveMode(true);
dftp.addHost(ftp://user1:pass@myftpserver.ed.ac.uk:21);
//Database update
Reader XSLreader = new FileReader("Resource2Update.xsl");
XSLTransform xsl = new XSLTransform();
xsl.addXSLT(XSLreader);
xsl.connectXMLInput(tee.getOutput(1));
WebRowSetCharacterDataToTuple webrowsetToTuples = new WebRowSetCharacterDataToTuple();
webrowsetToTuples.connectDataInput(xsl.getResultOutput());
SQLParameterisedUpdate sqlUpdate = new SQLParameterisedUpdate();
sqlUpdate.setResourceID(MySQLResource);
String sql =
"INSERT INTO testtransfer(id,name,address,phone) VALUES(?,?,?,?)";
sqlUpdate.addExpression(sql);
sqlUpdate.connectParametersInput(webrowsetToTuples.getResultOutput());
DeliverToRequestStatus deliverTorequestStatus = new DeliverToRequestStatus();
deliverTorequestStatus.connectInput(sqlUpdate.getResultOutput());
// Workflow.
PipelineWorkflow pipeline = new PipelineWorkflow();
pipeline.add(xpathStatement);
pipeline.add(xsl);
pipeline.add(transform2HTML);
pipeline.add(tee);
pipeline.add(dftp);
pipeline.add(webrowsetToTuples);
pipeline.add(sqlUpdate);
pipeline.add(deliverTorequestStatus);
RequestResource requestResource = mDRER.execute(pipeline,
RequestExecutionType.SYNCHRONOUS);
RequestStatus status = requestResource.getRequestStatus();
int[] results = sqlUpdate.nextResultAsArray();
This scenario is a variation of Section 7.8, “Inter-FTP server transfer” scenario. The differences are that apart from the filenames, the SQL execution returns some extra column data that we want to return to the client. Furthermore, the data from the files are read in a file resource rather than obtained from an FTP server. The following figure depicts the scenario.
The first part of the implementation involves the SQL query execution
and the tuple projection. Since we want to use the data of
filename column separately from the
other ones (lets say that these refer to some geometrical coordinates)
i.e. min_x,
min_y,
max_x,
max_y,
step we need to use two instances of
TupleProjection and therefore we have to duplicate the output of
SQLQuery by using Tee activity. The columns that we want to project
are provided by the client via CSVToTuple activity.
SQLQuery query = new SQLQuery();
CSVToTuple csvtotupleCoordinates = new CSVToTuple();
CSVToTuple csvtotupleFilename = new CSVToTuple();
TupleProjection coords = new TupleProjection();
TupleProjection filename = new TupleProjection();
Tee teeActivity = new Tee();
String sqlquery = "SELECT filename,min_x,min_y,max_x,max_y,step_size FROM mytable where id<2";
query.addExpression(sqlquery);
query.setResourceID(MySQLResource);
//csv to tuple for coordinates
csvtotupleCoordinates.addData(new CharArrayReader(
"min_x\nmin_y\nmax_x\nmax_y\nstep_size\n".toCharArray()));
//csv to tuple for filename
csvtotupleFilename.addData(new CharArrayReader("filename\n".toCharArray()));
Tee teeActivity = new Tee();
teeActivity.setNumberOfOutputs(2);
teeActivity.connectInput(query.getDataOutput());
coords.connectDataInput(teeActivity.getOutput(0));
coords.connectColumnIDsInput(csvtotupleCoordinates.getResultOutput());
filename.connectDataInput(teeActivity.getOutput(1));
filename.connectColumnIDsInput(csvtotupleFilename.getResultOutput());
Then the tuples containing of the "coordinates" columns need to be returned back to the client.
TupleToCSV tupletocsv = new TupleToCSV(); DeliverToRequestStatus delivertorequeststatus = new DeliverToRequestStatus(); //tupletocsv tupletocsv.connectDataInput(coords.getDataOutput()); //delivertoreuqeststatus delivertorequeststatus.connectInput(tupletocsv.getResultOutput());
At the same time the tuples of the filenames need to be transformed and then read the files in a file resource based on these filenames.
TupleSplit tuplesplit = new TupleSplit(); ListRemove listremover = new ListRemove(); Tee tee = new Tee(); ReadFromFile readfromfile = new ReadFromFile(); ControlledRepeat controlledrepeater = new ControlledRepeat(); DeliverToFTP delivertoftp = new DeliverToFTP(); //tuplesplit tuplesplit.connectDataInput(filename.getDataOutput()); //listremover listremover.connectInput(tuplesplit.getResultOutput(0)); //tee tee.setNumberOfOutputs(2); tee.connectInput(listremover.getOutput()); //readfromfile readfromfile.connectFileInput(tee.getOutput(0)); readfromfile.setResourceID(fileResource); //controlledrepater controlledrepeater.addRepeatedInput(ftp://user1:pass@myftpserver.ed.ac.uk:21); controlledrepeater.connectInput(tee.getOutput(1)); //delivertoftp delivertoftp.connectDataInput(readfromfile.getDataOutput()); delivertoftp.connectFilenameInput(controlledrepeater.getOutput()); delivertoftp.connectHostInput(controlledrepeater.getRepeatedOutput());
The final part of the implementation is to execute the request and get the coordinates through the results of TupleToCSV activity.
//Workflow
PipelineWorkflow deliverPipeline = new PipelineWorkflow();
deliverPipeline.add(tupletocsv);
deliverPipeline.add(csvtotupleCoordinates);
deliverPipeline.add(csvtotupleFilename);
deliverPipeline.add(teeActivity);
deliverPipeline.add(coords);
deliverPipeline.add(filename);
deliverPipeline.add(query);
deliverPipeline.add(tuplesplit);
deliverPipeline.add(listremover);
deliverPipeline.add(readfromfile);
deliverPipeline.add(controlledrepeater);
deliverPipeline.add(tee);
deliverPipeline.add(delivertoftp);
deliverPipeline.add(delivertorequeststatus);
RequestResource requestResource = mDRER.execute(deliverPipeline,
RequestExecutionType.SYNCHRONOUS);
RequestStatus status = requestResource.getRequestStatus();
//Get coordinates from tupletocsv through request status.
while(tupletocsv.hasNextResult())
{
Reader reader = tupletocsv.nextResult();
BufferedReader buffReader = new BufferedReader(reader);
String line = null;
while ((line = buffReader.readLine()) != null)
{
System.out.println(line);
}
}
To list the tables of a relational resource you'll need the GetAvailableTables activity. The client toolkit class representing this activity is:
uk.org.ogsadai.client.toolkit.activities.sql.GetAvailableTables
To build a workflow to list the tables of a relational resource and return these synchronously to a client you need to create a GetAvailableTables activity and connect its output to a DeliverToRequestStatus activity e.g.
// Create activities GetAvailableTables getTables = new GetAvailableTables(); getTables.setResourceID(myRelationalResourceID); DeliverToRequestStatus deliverToRequestStatus = new DeliverToRequestStatus(); deliverToRequestStatus.connectInput(getTables.getDataOutput()); // Create workflow PipelineWorkflow pipeline = new PipelineWorkflow(); pipeline.add(getTables); pipeline.add(deliverToRequestStatus);
You can then submit this workflow to a DRER in the usual way e.g.:
RequestResource requestResource =
mDRER.execute(pipeline, RequestExecutionType.SYNCHRONOUS);
RequestStatus status = requestResource.getRequestStatus();
You can then get the table names in two ways. One way is to get them as an array via the client toolkit representation of GetAvailableTables e.g.
String[] tableNames = getTables.nextDataAsArray();
for (int i = 0; i < tableNames.length; i++)
{
System.out.println("Table: " + tableNames[i]);
}
Another way is to get them one by one using an iterator provided by the client toolkit representation of GetAvailableTables e.g.
DataIterator dataList = getTables.nextData();
List myList = new ArrayList();
while (dataList.hasNext())
{
myList.add(dataList.next());
}
for (int i = 0; i < myList.size(); i++)
{
System.out.println("Table: " + myList.get(i));
}
To list the tables of a relational resource you'll need the ExtractTableSchema and TableMetadataToXMLCharArrays activities. The client toolkit class representing these activities are:
uk.org.ogsadai.client.toolkit.activities.sql.ExtractTableSchema uk.org.ogsadai.client.toolkit.activities.transform.TableMetadataToXMLCharArrays
To build a workflow to list the tables of a relational resource and return these synchronously to a client you need to create an ExtractTableSchema activity - which gets the table schema - connect its output to a TableMetadataToXMLCharArrays activity - which converts the schema to XML - and connect its output to a DeliverToRequestStatus activity e.g.
// Create activities
ExtractTableSchema extractTableSchema = new ExtractTableSchema();
extractTableSchema.setResourceID(mDataResourceID);
// The name of the table for which the schema is required.
extractTableSchema.addName("littleblackbook");
TableMetadataToXMLCharArrays schemaData = new TableMetadataToXMLCharArrays();
schemaData.connectDataInput(extractTableSchema.getDataOutput());
DeliverToRequestStatus deliverToRequestStatus = new DeliverToRequestStatus();
deliverToRequestStatus.connectInput(schemaData.getResultOutput());
// Create workflow
PipelineWorkflow pipeline = new PipelineWorkflow();
pipeline.add(extractTableSchema);
pipeline.add(schemaData);
pipeline.add(deliverToRequestStatus);
You can then submit this workflow to a DRER in the usual way e.g.:
RequestResource requestResource =
mDRER.execute(pipeline, RequestExecutionType.SYNCHRONOUS);
RequestStatus status = requestResource.getRequestStatus();
You can then get the table schema. There are a number of classes that represent table schema information e.g.
import uk.org.ogsadai.converters.databaseschema.ColumnMetaData; import uk.org.ogsadai.converters.databaseschema.KeyMetaData; import uk.org.ogsadai.converters.databaseschema.TableMetaData;
The client toolkit representation of TableMetadataToXMLCharArrays
provides a method that converts the XML representation of the schema
into a TableMetaData object.
A simple example of using this to display the table schema is as follows. Note how columns are indexed from 1..N and not 0..N-1 where N is the number of columns.
TableMetaData tableMetaData = schemaData.nextResultAsTableMetaData();
System.out.println("Name:" + tableMetaData.getName());
System.out.println("Schema name: " + tableMetaData.getSchemaName());
System.out.println("Catalog name: " + tableMetaData.getCatalogName());
int numCols = tableMetaData.getColumnCount();
System.out.println("Column count: " + numCols);
for (int i = 1; i <= numCols; i++)
{
ColumnMetaData cmd = tableMetaData.getColumn(i);
System.out.println(" Column: " + i + " " + cmd.getName() + " " + cmd.getFullName());
System.out.println(" Data type: " + cmd.getDataType() + " SQL type: " + cmd.getSQLType() +
" Size: " + cmd.getColumnSize() + " Position: " + cmd.getPosition() +
" Is primary key?: " + cmd.isPrimaryKey());
}
Note how the ExtractTableSchema activity takes the table name as an input. You can therefore use this in conjunction with the results of the GetAvailableTables activity to get the meta-data for a number of tables. One simple way of doing this is to combine the code of Section 23.13, “How to list the tables of a relational resource” with that of this section so that your client gets a list of all the available tables and then gets the schema for each in turn.
The OGSA-DAI project has a number of scenarios and example workflows, implemented using the client toolkit, as part of its system test framework. Over time we'd envisage turning these into examples and including these in OGSA-DAI releases and describing them in the user doc. In the meantime however we will incrementally place our system test classes on our WWW site at:
http://www.ogsadai.org.uk/documentation/ogsadai3.0/scenarios
Along with these we will place information on how you can cannibalise our test classes for use in your own clients.