Purpose of the article:
This article aims to show how can be structured the architecture of a distributed computing, for purposes such as scientific research (such as distributed computing of segments of DNA for fetal disease analysis or cancer research and so on ...) .
It is not needed special hardware or software and even so many years of java programming,
although I must admit that the distributed programming and RMI (Remote Method Invocation) belong to the topics covered in the "Advanced Programming", however, in my opinion , it is a possible challenge.
Good reading.
What is Grid Computing ?
Grid computing or grid systems are distributed computing infrastructure used for processing large amounts of data through the use of a vast amount of resources. In particular, these systems allow the coordinated sharing of resources within a virtual organization.
How is it possible to implement this kind of architecture in Java using standard computer in a "home made" way?
We're going to pitch in one of the most important pioneering topics about the research in the field of networks and Internet.
To create a globally extended grid computing system , we need a Java Virtual Machine (not tested with OpenJDK), an operating system that supports Java (Mac OSX, Linux, Windows, Solaris, and etc ...) in particular there need java6 and a computer with a good amount of RAM (1 GB), an IDE for Java programming (Eclipse is recommended) and last (but certainly not least) a lot of patience ....
The distributed architecture, very simplistic, that I am going to show in this article consists of a central node, that we call Server, which provides a naming service (Name Server) to query the various nodes of the network and also plays as
Resource Broker (distributed resource scheduler able to choose at which node assign the calculation), all other nodes (computer network) are called clients of the central server, but since they provide the service of calculation, so we can not think of an architecture Grid as a simple Client / Server Architecture, but to something closer to a hybrid Peer to Peer.
Each node receives a task (job) to be completed in a time T, this time must be greater than the transferring time necessary to transfer the data parameters to the distributed method at the maximum speed that the network supports, ie T
elab > (Data * Frequency) otherwise is not convenient using a system like Grid Computing.
How to realize it using Java
Needed classes:
- DistributedServer Interface
- DistributedServerSystemImpl Class which is the implementation of the DistributedServer interface
- ResourceBroker Class
- ResourceBrokerServerName Class
In the example is shown the procedure to compress, crypt, uncompress and decrypt atoms of a file using RSA at 1024 bit and LZMA compression algorithm. It is, basically, the piece of the prototype of the distributed implementation for the SPTP (Secure Parallel Distributed Transfer Protocol) which I invented about one year ago.
DistributedServer Interface
This interface has two methods: CreateDataPackage and unpackData which deal with compress or decompress encrypt and decrypt atoms respectively usingLZMA and RSA algorithms, these methods can be replaced by the methods of calculation you want to distribute. In java, you can also pass as parameter everything that is serializable.Serializable tasks are seen in the official Oracle example.
package SPTP.protocol.GridComputing;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.security.KeyPair;
import java.security.PublicKey;
import SPTP.protocol.CommonClasses.*;
public interface DistributedServer extends Remote {
public PackageStream createDataPackage(byte[] dataToSend,int packageID,
Compression.compressionType type, PublicKey key,
final int ATOMIC_SIZE, ListOfPackagePOJO listOfPackage,
String NUM_OF_THREADS) throws RemoteException;
public byte[] unpackData(PackageStream pkg, int packageID, KeyPair keys,
int packageSize, Compression.compressionType type)
throws RemoteException;
/**
* Greater is the value returned, more possibility it has to be chosen by
* the Resource Broker when compared with other nodes.
* @return value of free available processors and resources (greater is better)
* @throws RemoteException
*/
public int getWorkload() throws RemoteException;
public boolean youLive() throws RemoteException;
}
DistributedServerSystemImpl Class
This class is not only the physical implementation of the interface DistributedServer but rather is the implementation of the node, indeed as the software that must run on one of the many computers client. Specifically, pay attention to the implementation of the Main.
In the Main, the client tries to log into the server (whose IP address is passed as the main parameter) then once logged in, it creates a registry of distributed objects that musts process and waits for the incoming calls from the server. To close the node the user must type "exit" on a console.
The example refers to few external classes, developed by me, you can found these classes in the SPTP package found at this address.
package SPTP.protocol.GridComputing;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.rmi.Naming;
import java.rmi.RemoteException;
import java.rmi.RMISecurityManager;
import java.rmi.server.UnicastRemoteObject;
import java.rmi.registry.*;
import java.security.KeyPair;
import java.security.PublicKey;
import java.util.ArrayList;
import SPTP.protocol.CommonClasses.Compression;
import SPTP.protocol.CommonClasses.ListOfPackagePOJO;
import SPTP.protocol.CommonClasses.PackageStream;
import SPTP.protocol.CommonClasses.RSA2;
import SPTP.protocol.CommonClasses.Utility;
import SPTP.protocol.CommonClasses.Compression.compressionType;
public class DistributedServerSystemImpl extends UnicastRemoteObject
implements DistributedServer{
/**
* SERIAL VERSION ID
*/
private static final long serialVersionUID = 7165234451015536983L;
private int workload ;
private int numOfProcessors;
String nodeName = "Node";
public DistributedServerSystemImpl() throws RemoteException
{
super();
this.numOfProcessors = Runtime.getRuntime().availableProcessors();
}
public DistributedServerSystemImpl(String nodeName) throws RemoteException
{
super();
this.numOfProcessors = Runtime.getRuntime().availableProcessors();
this.nodeName = nodeName;
}
@Override
public PackageStream createDataPackage(byte[] dataToSend, int packageID,
compressionType type, PublicKey key, int ATOMIC_SIZE,
ListOfPackagePOJO listOfPackage, String NUM_OF_THREADS)
throws RemoteException {
System.out.println("I am ready to create data package");
this.numOfProcessors--;
byte[] data2 = Compression.compress(dataToSend, type);
dataToSend = data2;
int numOfAtoms = Utility.getPackagesNumber(dataToSend.length, ATOMIC_SIZE);
ArrayList<byte[]> list = new ArrayList();
int lastSector = 0;
byte[] arrayTemp;
for(int i = 1; i <= numOfAtoms; i++ ){
if(i == numOfAtoms){
int newSector = lastSector + 1;
arrayTemp = new byte[((dataToSend.length) - lastSector )];
int n = 0;
for(int j = lastSector; j < dataToSend.length; j++){
arrayTemp[n] = dataToSend[j];
n++;
}
try {
list.add(RSA2.encrypt(arrayTemp, key));
} catch (Exception e) {
e.printStackTrace();
}
}else{
arrayTemp = new byte[ATOMIC_SIZE];
int m = 0;
while(lastSector < ((ATOMIC_SIZE) * i)){
arrayTemp[m] = dataToSend[lastSector];
lastSector++;
m++;
}
try {
list.add(RSA2.encrypt(arrayTemp, key));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
PackageStream pack = new PackageStream(list,packageID);
this.numOfProcessors++;
return pack;
}
@Override
public byte[] unpackData(PackageStream pkg, int packageID, KeyPair keys, int packageSize,
compressionType type) throws RemoteException {
System.out.println("I am ready to unpack data!");
this.numOfProcessors--;
byte[] arrayTemp = new byte[0];
try{
ArrayList lista = new ArrayList();
ArrayList data = pkg.getData();
byte[] temp = new byte[0];
for(int i = 0; i < data.size(); i++){
try {
byte[] unCifred = RSA2.decrypt((byte[])data.get(i),keys);
temp = arrayTemp.clone();
arrayTemp = new byte[unCifred.length+temp.length];
int posAttuale = 0;
for(int h = 0; h < (temp.length) && temp.length > 0; h++){
arrayTemp[h] = temp[h];
posAttuale = h;
}
int m = 0;
if (i == 0) posAttuale = -1;
for(int h = posAttuale + 1; h < (unCifred.length+temp.length);h++ ){
arrayTemp[h] = unCifred[m];
m++;
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}catch (Exception e) {
System.err.println("An error occurred:"+e.toString());
}
byte[] toReturn = Compression.uncompress(arrayTemp, type);
this.numOfProcessors++;
return toReturn;
}
public int getWorkload() throws RemoteException{
float m = Runtime.getRuntime().totalMemory()/
Runtime.getRuntime().freeMemory();
this.workload = Math.round(m * this.numOfProcessors);
return this.workload;
}
public boolean youLive() throws RemoteException{
System.out.println("Yes I am live");
return true;
}
public static void main(String args[]) throws IOException {
InetAddress localHost = null;
try {
localHost = InetAddress.getLocalHost();
} catch (UnknownHostException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
if(args.length > 0){
String toSend = localHost.getHostAddress();
String received;
Socket clientSocket = null;
System.out.println("Trying to connect to "+args[0].toLowerCase()+"...");
do {
try {
clientSocket = new Socket(args[0].toLowerCase(), 12999);
ObjectOutputStream outToServer = new ObjectOutputStream(clientSocket.getOutputStream());
outToServer.writeObject(toSend);
outToServer.flush();
ObjectInputStream inFromServer = new ObjectInputStream(clientSocket.getInputStream());
received = (String) inFromServer.readObject();
System.out.println("FROM SERVER: " + received);
clientSocket.close();
} catch (Exception e) {
}
} while (clientSocket == null);
}
System.setSecurityManager(new RMISecurityManager());
try {
LocateRegistry.createRegistry(1099);
System.out.println("Registry created");
DistributedServer ds = new DistributedServerSystemImpl(localHost.getHostAddress());
Naming.rebind("//"+localHost.getHostAddress()+"/Node", ds);
System.out.println("Node on "+localHost.getHostAddress()+" is ready");
BufferedReader dis = new BufferedReader(new InputStreamReader(System.in));
while (!"exit".equals(dis.readLine())) ;
UnicastRemoteObject.unexportObject(ds, false);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
ResourceBroker Class
Resource Broker is responsible for installing a log on the server machine in which to reference and keep track of all nodes connected to it and also it keeps track to whom node entrust the job and also to it is responsable of removing nodes that are no longer used or which are currently inaccessible.
Choosing the best node to send a piece of work is done automatically: the Resource Broker asks each node in the list how many free resources it has (the value corresponding to free resources is the ratio between the total memory and the free memory multiplied by the number of available processors). Then by a simple (and linear) algorithm to choose the maximum, is chosen the node that statistically has more available resources.
This is only a prototype, the ResourceBroker can be greatly optimized implementing search algorithms with computational complexity at most O(n log N), or implementing more precise search criteria based on the physical distance between the server and the client (in meters) .
package SPTP.protocol.GridComputing;
import java.io.IOException;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.rmi.AccessException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.RMISecurityManager;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import SPTP.protocol.ServerInterface.ConnectionHandler;
public class ResourceBroker implements Runnable {
private ArrayList nodeList ;
private DistributedServer preferredNode;
private boolean enabled = true;
public ResourceBroker() throws RemoteException, UnknownHostException {
nodeList = new ArrayList();
System.out.println("RMI server started");
// Create and install a security manager
if (System.getSecurityManager() == null) {
System.setSecurityManager(new RMISecurityManager());
System.out.println("Security manager installed.");
}
else
System.out.println("Security manager already exists.");
try //special exception handler for registry creation
{
LocateRegistry.createRegistry(1099);
System.out.println("java RMI registry created.");
} catch (RemoteException e) {
System.out.println("java RMI registry already exists.");
}
Thread t = new Thread(this);
t.start();
}
@Override
public void run() {
InetAddress localHost = null;
try {
localHost = InetAddress.getLocalHost();
} catch (UnknownHostException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
Registry registry = null;
DistributedServer Node = null;
try {
registry = LocateRegistry.getRegistry(localHost.getHostAddress());
} catch (RemoteException e1) {
e1.printStackTrace();
}
try {
Node = new DistributedServerSystemImpl(localHost.getHostAddress());
} catch (RemoteException e1) {
e1.printStackTrace();
}
try {
Naming.rebind("//"+localHost.getHostAddress()+"/Node", Node);
} catch (AccessException e1) {
System.out.println("AccessException launched");
e1.printStackTrace();
} catch (RemoteException e1) {
System.out.println("Remote Exception launched");
e1.printStackTrace();
} catch (MalformedURLException e) {
e.printStackTrace();
}
synchronized (nodeList) {
nodeList.add(localHost.getHostAddress());
}
this.preferredNode = (DistributedServerSystemImpl) Node;
System.out.println("PeerServer bound in registry");
PollingNodes pollingThread = new PollingNodes(8000, "Polling Nodes begun");
ServerSocket welcomeSocket = null;
try {
welcomeSocket = new ServerSocket(12999);
ResourceBrokerServerName resName =
new ResourceBrokerServerName(this,welcomeSocket);
System.out.println("Nameserver started!");
} catch (IOException e) {
e.printStackTrace();
}
while(enabled){
try {
this.preferredNode = searchBest();
} catch (MalformedURLException e1) {
e1.printStackTrace();
} catch (RemoteException e1) {
e1.printStackTrace();
} catch (NotBoundException e1) {
e1.printStackTrace();
}catch (NullPointerException e1) {
System.out.println("Empty List");
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private DistributedServer searchBest() throws MalformedURLException, RemoteException, NotBoundException{
DistributedServer Node = this.preferredNode;
String toReturn = "";
if(nodeList.size() > 0){
String ip = nodeList.get(0);
toReturn = ip;
Node = (DistributedServer) Naming.lookup("rmi://"+
ip+"/Node");
for(int i = 0; i < nodeList.size(); i++){
ip = nodeList.get(i);
DistributedServer Node2 = (DistributedServer) Naming.lookup("rmi://"+
ip+"/Node");
if(Node.getWorkload() < Node2.getWorkload()) toReturn = ip;
}
Node = (DistributedServer) Naming.lookup("rmi://"+
toReturn+"/Node");
}
System.out.println("Best noode is "+toReturn+" \nWork Load: "+Node.getWorkload());
return Node ;
}
private void sort() {
synchronized (nodeList) {
Collections.sort(nodeList);
}
}
//other methods here
public void addNode(String IPNodeAddress){
addNode(IPNodeAddress,1099);
}
public void addNode(String IPNodeAddress,int Port){
try
{
synchronized (nodeList) {
nodeList.add(IPNodeAddress);
}
System.out.println("PeerServer bound in registry");
}catch (Exception e)
{
e.printStackTrace();
}
System.out.println("Number of nodes = "+nodeList.size());
}
public void deleteNode(String IPNodeAddress) throws
RemoteException, MalformedURLException, NotBoundException{
deleteNode(IPNodeAddress,1099);
}
public void deleteNode(String IPNodeAddress, int Port) throws
RemoteException, MalformedURLException, NotBoundException{
Naming.unbind("//"+IPNodeAddress+":"+Port+"/Node");
synchronized (nodeList) {
int h = -1;
for(int i = 0; i < nodeList.size(); i++){
if(nodeList.get(i).equals(IPNodeAddress)){
h = i;
}
}
if(h > -1) nodeList.remove(h);
}
}
public void shutdownGrid(){
this.enabled = false;
}
public DistributedServer getPreferredNode(){
return this.preferredNode;
}
class PollingNodes extends Thread {
private int delay;
private String message;
public PollingNodes(int delay, String msg) {
this.delay = delay;
message = msg;
setDaemon(true);
start();
}
public void run() {
String IP = "";
while(true){
if(nodeList.size() > 0){
for(int i = 0; i < nodeList.size(); i++){
DistributedServer Node = null;
synchronized (nodeList) {
IP = nodeList.get(i);
try {
Node = (DistributedServer) Naming.lookup("rmi://"+IP+"/Node");
} catch (MalformedURLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (RemoteException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (NotBoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
if(Node.youLive()) System.out.println("Node "+IP+" is live");
} catch (RemoteException e) {
try {
deleteNode(IP);
} catch (RemoteException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (MalformedURLException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (NotBoundException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
try {
Thread.sleep(this.delay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
ResourceBrokerServerName Class
This class takes care of accept automatically requests of authentication from the nodes and map the presence of authenticated nodes in the principal server.
package SPTP.protocol.GridComputing;
import java.net.*;
import java.io.*;
public class ResourceBrokerServerName extends Thread {
private ResourceBroker scheduler = null;
private ServerSocket welcomeSocket ;
public ResourceBrokerServerName(ResourceBroker scheduler,ServerSocket welcomeSocket) {
super();
this.scheduler = scheduler;
this.welcomeSocket = welcomeSocket;
Thread t = new Thread(this);
t.start();
}
public void run() {
while(true)
{
Socket connectionSocket = null;
try {
connectionSocket = this.welcomeSocket.accept();
new HandleRequest(connectionSocket);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public class HandleRequest extends Thread{
private Socket connection;
public HandleRequest(Socket connection){
this.connection = connection;
Thread t = new Thread(this);
t.start();
}
public void run(){
System.out.println("Richiesta avvenuta");
String clientSentence = "";
String capitalizedSentence = "";
ObjectInputStream inFromClient = null;
try {
inFromClient = new ObjectInputStream(connection.getInputStream());
} catch (IOException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
ObjectOutputStream outToClient = null;
try {
outToClient = new ObjectOutputStream(connection.getOutputStream());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
clientSentence = (String) inFromClient.readObject();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
scheduler.addNode(clientSentence);
System.out.println("Received: " + clientSentence);
capitalizedSentence = "Inserted";
try {
outToClient.writeObject(capitalizedSentence);
outToClient.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
Launch our Grid Computing enviroment
Create the file java.policy for activate all the remothe invocation permissions:
grant {
permission java.security.AllPermission;
};
To use the developed classes, we must change the classes previously implemented so that they can invoke the distributed methods CreateDataPackage and unpackData, and each node also needs to be run using the command:
java -Djava.security.policy=java.policy -jar SPTP_DistrNode.jar 192.168.1.100
where 192.168.1.100 è is the IP server.
Once the server is active, the node will authenticate automatically.
In the package SPTP_DistrNode.jar is exported as executable class the DistributedServerSystemImpl class and it contains also all the other classes.
Final Author's toughts
This is only a prototype of distributed computing, in the example shown does not need any super computer system and even expensive IBM grid-style computing.
You just need the cooperation of the entire scientific community, a similar protocol could be implemented in sites like Wikipedia, which would draw on scientific information and also tacitly collaborate in scientific research providing a small slice of our calculation to solve computational problems joining in an "ACTIVE WAY" for research.
The European community will allocate 25 million Euros for the development of the EGI (European Grid Infrastructure), I think that can be saved all this money by implementing an architecture similar to one presented in this article using even the obsolete computer products since 2001 on . In this way we would solve two problems in one fell swoop: Grid Computing for Scientific Research and the problem of pollution from the disposal of obsolete computers.
Forgive the formatting errors indicating a bad indentation, but it's just the fault of the restricted space for the post on the blog.
Future revisions of this article are provided.
Vincenzo Dentamaro IT Professional Developer - University of Bari - Italy