org.objectweb.joram.mom.dest

Class ClusterQueueImpl

Implemented Interfaces:
java.io.Serializable

public class ClusterQueueImpl
extends QueueImpl

The ClusterQueueImpl class implements the MOM queue behaviour, basically storing messages and delivering them upon clients requests or delivering to an other cluster queue.

See Also:
Serialized Form

Field Summary

protected Hashtable
clusters
key = agentId of ClusterQueue value = rateOfFlow (Float)
protected LoadingFactor
loadingFactor
to calcul the loading factor, overloaded, ...
protected long
period
period to eval the loading factor
protected long
waitAfterClusterReq
waiting after a cluster request

Fields inherited from class org.objectweb.joram.mom.dest.QueueImpl

arrivalsCounter, consumers, contexts, deliveredMsgs, messages, persistenceModule, receiving, requests

Fields inherited from class org.objectweb.joram.mom.dest.DestinationImpl

READ, READWRITE, WRITE, adminId, clients, destId, dmqId, freeReading, freeWriting

Constructor Summary

ClusterQueueImpl(AgentId destId, AgentId adminId, long period, int producThreshold, int consumThreshold, boolean autoEvalThreshold, long waitAfterClusterReq)
Constructs a ClusterQueueImpl instance.

Method Summary

protected void
addQueueCluster(String joiningQueue, float rateOfFlow)
send to joiningQueue a JoinQueueCluster not.
protected void
broadcastLeave(String removeQueue)
broadcast to cluster the removeQueue.
protected Object
doList(ListClusterQueue req)
return the cluster list (vector).
protected void
doProcess(ClientMessages not)
overload doProcess(ClientMessages) store all msgId in timeTable and visitTable, store message and deliver message if consumer wait.
protected void
doProcess(SetRightRequest not)
propagate right to all cluster.
protected void
doReact(AgentId from, LBCycleLife not)
The messages are not consumed by an other cluster's queue in a periode time, try to consume in this queue.
protected void
doReact(AgentId from, LBMessageGive not)
load balancing message give by an other cluster queue.
protected void
doReact(AgentId from, LBMessageHope not)
load balancing message hope by the "from" queue.
protected void
doReact(AckJoinQueueCluster not)
protected void
doReact(JoinQueueCluster not)
new queue come in cluster, update clusters.
protected void
doReact(ReceiveRequest not)
protected void
doReact(SetRightQueueCluster not)
set the same right to all cluster
protected void
doReact(WakeUpNot not)
wake up, and call factorCheck to evaluate the loading factor...
long
getClusterDeliveryCount()
return the number of Message send to cluster.
int
getNumberOfPendingMessages()
int
getNumberOfPendingRequests()
protected void
messageDelivered(String msgId)
call in deliverMessages just after channel.sendTo(msg), overload this methode to process a specific treatment.
protected void
messageRemoved(String msgId)
call in deliverMessages just after a remove message (invalid), overload this methode to process a specific treatment.
protected void
messageSendToCluster(String msgId)
void
react(AgentId from, Notification not)
Distributes the received notifications to the appropriate reactions.
protected void
removeQueueCluster(String removeQueue)
removeQueue leave the cluster.
protected void
sendToCluster(QueueClusterNot not)
send to all queue in cluster.
protected Object
specialAdminProcess(SpecialAdminRequest not)
use to add or remove ClusterQueue to cluster.
protected void
specialProcess(Notification not)
implement special process (see QueueImpl).
String
toString()

Methods inherited from class org.objectweb.joram.mom.dest.QueueImpl

deliverMessages, doProcess, doProcess, doProcess, doProcess, doReact, doReact, doReact, doReact, doReact, doReact, doReact, doReact, isUndeliverable, messageDelivered, messageRemoved, react, specialProcess, storeMessage, toString

Methods inherited from class org.objectweb.joram.mom.dest.DestinationImpl

canBeDeleted, doReact, doReact, doReact, doReact, doReact, doReact, doReact, doReact, doReact, doReact, isAdministrator, isReader, isWriter, processSetRight, react, sendToDMQ, specialAdminProcess, specialProcess

Field Details

clusters

protected Hashtable clusters
key = agentId of ClusterQueue value = rateOfFlow (Float)


loadingFactor

protected LoadingFactor loadingFactor
to calcul the loading factor, overloaded, ...


period

protected long period
period to eval the loading factor


waitAfterClusterReq

protected long waitAfterClusterReq
waiting after a cluster request

Constructor Details

ClusterQueueImpl

public ClusterQueueImpl(AgentId destId,
                        AgentId adminId,
                        long period,
                        int producThreshold,
                        int consumThreshold,
                        boolean autoEvalThreshold,
                        long waitAfterClusterReq)
Constructs a ClusterQueueImpl instance.

Parameters:
destId - Identifier of the agent hosting the queue.
adminId - Identifier of the administrator of the queue.

Method Details

addQueueCluster

protected void addQueueCluster(String joiningQueue,
                               float rateOfFlow)
send to joiningQueue a JoinQueueCluster not.


broadcastLeave

protected void broadcastLeave(String removeQueue)
broadcast to cluster the removeQueue.


doList

protected Object doList(ListClusterQueue req)
return the cluster list (vector).


doProcess

protected void doProcess(ClientMessages not)
overload doProcess(ClientMessages) store all msgId in timeTable and visitTable, store message and deliver message if consumer wait. call factorCheck to evaluate the loading factor, activity, ... and send message to cluster if need.
Overrides:
doProcess in interface QueueImpl


doProcess

protected void doProcess(SetRightRequest not)
propagate right to all cluster.
Overrides:
doProcess in interface QueueImpl


doReact

protected void doReact(AgentId from,
                       LBCycleLife not)
The messages are not consumed by an other cluster's queue in a periode time, try to consume in this queue. update visitTable, and process clientMessages.


doReact

protected void doReact(AgentId from,
                       LBMessageGive not)
            throws UnknownNotificationException
load balancing message give by an other cluster queue. process ClientMessages, no need to check if sender is writer.


doReact

protected void doReact(AgentId from,
                       LBMessageHope not)
load balancing message hope by the "from" queue.


doReact

protected void doReact(AckJoinQueueCluster not)


doReact

protected void doReact(JoinQueueCluster not)
new queue come in cluster, update clusters. and spread to clusters the AckjoiningQueue.


doReact

protected void doReact(ReceiveRequest not)


doReact

protected void doReact(SetRightQueueCluster not)
set the same right to all cluster


doReact

protected void doReact(WakeUpNot not)
wake up, and call factorCheck to evaluate the loading factor... if msg stay more a periode time in timeTable send to an other (no visited) queue in cluster.


getClusterDeliveryCount

public long getClusterDeliveryCount()
return the number of Message send to cluster.


getNumberOfPendingMessages

public int getNumberOfPendingMessages()


getNumberOfPendingRequests

public int getNumberOfPendingRequests()


messageDelivered

protected void messageDelivered(String msgId)
call in deliverMessages just after channel.sendTo(msg), overload this methode to process a specific treatment.
Overrides:
messageDelivered in interface QueueImpl


messageRemoved

protected void messageRemoved(String msgId)
call in deliverMessages just after a remove message (invalid), overload this methode to process a specific treatment.
Overrides:
messageRemoved in interface QueueImpl


messageSendToCluster

protected void messageSendToCluster(String msgId)


react

public void react(AgentId from,
                  Notification not)
            throws UnknownNotificationException
Distributes the received notifications to the appropriate reactions.
Overrides:
react in interface QueueImpl

Throws:
UnknownNotificationException - When receiving an unexpected notification.


removeQueueCluster

protected void removeQueueCluster(String removeQueue)
removeQueue leave the cluster.


sendToCluster

protected void sendToCluster(QueueClusterNot not)
send to all queue in cluster.


specialAdminProcess

protected Object specialAdminProcess(SpecialAdminRequest not)
            throws RequestException
use to add or remove ClusterQueue to cluster.
Overrides:
specialAdminProcess in interface DestinationImpl


specialProcess

protected void specialProcess(Notification not)
implement special process (see QueueImpl).
Overrides:
specialProcess in interface QueueImpl


toString

public String toString()
Overrides:
toString in interface QueueImpl


Copyright B) 2004 Scalagent - All rights reserved