AgE 2.5 : Communication based on JMS

Contents:

Documentation based on Konrad Kaplita and Maciej Rząsa work. Original docs are at AgE Trac

API Specification

public interface ICommunicationService {
  void send (Message<IComponentAddress> msg);
  Message<IComponentAddress> receive(String componentId);
}

(rezygnujemy ze "status")

public interface IComponentAddress extends IAddress {
  public INodeAddress getNodeAddress();
  public String getComponentId();
}

public class Message<A extends IAddress>{
  public IHeader<A> getHeader();
  public Object getContents();
}

public interface IHeader<A extends IAddress> {
  public A getSenderAddress();
  public IAddressSelector<A> getReceiverAddress();
}

public interface IMessageListener<A extends IAddress>{
  void deliver(Message<A> message);
}

Architecture

Diagram komponentów

Stworzony komponent udostępnia API komunikacyjne ICommunicationComponent, sam zaś korzysta z interfejsu JMS (do komunikacji międzu węzłami) oraz JNDI (do rozdzielenia adresacji logicznej od fizycznej).

Diagram wdrożenia

Na każdym węźle, na którym działa CommunicationComponent uruchamiany jest JMS. Ponadto na jednym ogólnie znanym węźle działa usługa JNDI. Oba te serwisy uruchamiane są podczas inicjalizaji komponentu i nie wymagają dodatkowych działań administracyjnych.

Diagram klas

Communicator: udostępnia ustalone API komunikacyjne. Ponadto umożliwia pobranie adresów dostępnych węzłów z katalogu JNDI – metoda getAvailaibleNodes (przydatne podczas testowania).

JMSEndpoint: umożliwia komunikację z konkretną instancją brokera JMS. Jedna instacja JMSEndpoint jest osadzona na stałe w każdym obiekcie Communicator i pozwala na komunikację z lokalną usługą JMS. Druga jest tworzona przy każdym wywołaniu send i umożliwia komunikację z serwisem odbiorcy. Ponadto klasa ta implementuje wzorzec Observer powiadamiąc wszystkie elementy z listy listeners o nadchodzącej wiadomości.

JNDIConnector: singleton pozwalający na pobranie bieżącego kontekstu JNDI.

BrokerAddress: klasa opakowująca konfigurację brokera konieczną do stworzenia obiektu JMSEndpoint komunikującego się z nim. Jest rejestrowana w usłudze JNDI.

Header, CommnicationServiceMessage : klasy opakowujące przesyłany obiekt i pozwalające na identyfikację nadawcy.

Sequence diagrams

Inicjalizjacja

void init()

Komponent startująć startuje lokalną usługę JMS (localBroker). Następnie na podstawie jej danych (brokerAddress) tworzy localEndpoint służący do odbierania wiadomości. Rejestruje go w JNDI jako nazwę używając reprezentacji NodeAddress.

W końcowej implemetacji dodano wysyłanie wiadomości, które zostały zgłoszone do przesłania przed końcem inicjalizacji komponentu komunikacyjnego.

Wysyłanie

IStatusMonitor send(IHeader header, Object payload)

Komponent komunikacyjny wysyłającego pobiera dane odbiorcy z JNDI i tworzy remoteEndpoint. Następnie za jego pomocą wysyła wiadomość do kolejki umieszczonej w brokerze osadzonym w odbiorcy. ComponentId jest użyte jako właściwość wiadomości, co pozwala na odebranie jej przez odpowiedni komponent.

W końcowej implemetacji dodano możliwość bezpiecznego wywołania tej metody przed inicjalizacją komponentu: wiadomości są buforowane i wysyłane pod koniec wywołania metody init().

Odbieranie

tryb pull

CommunicationServiceMessage receive(String componentId)

Komponent komunikacyjny przy pomocy localEndpoint pobiera z lokalnej kolejki wiadomość zgodną z podanym componentId. Jeśli komoponent nie został zainicjalizowany metoda zwraca null (dodano w końcowej implementacji).

tryb push

(usunięty w ostatecznej wersji API, pozostawiony w dokumentacji jako propozycja alternatywna)

void register(ICommunicationServiceMessageListener listener);
void unregister(ICommunicationServiceMessageListener listener);

Ze względu na prostotę implementacji, działanie nie zostało przedstawione na diagramach. Listener rejestruje się w komponencie komunikacyjnym, który przekazuje referencje do obiektu localEndpoint, który powiadamia wszystkich zarejestrowanych o nadejściu nowej wiadomości.

Broker Structure

Komunikacja w obrębie platformy JAgE musi być jak najbardziej efektywna. W tym celu zrezygnowaliśmy z architektury z jednym globalnym brokerem na rzecz wielu lokalnych brokerów.

Czym jest broker?

Broker jest serwisem zarządzającym kolejkami wiadomości. W obrębie jednego bokera możliwe jest tworzenie wielu kolejek wiadomości.

Broker w AgE

Serwis brokera będzie automatycznie startowany przy ładowaniu przez środowisko JAgE komponentu komunikacyjnego CommunicationComponent?. W konfiguracji komponentu będzie podany adres i port na jakich będzie nasłuchiwał broker. Uruchamiany jest więc jeden broker per CommunicationComponent?. Będzie to architektura z tzw. embedded brokerem. Istnienie brokera jest kompletnie przezroczyste dla użytkownika platformy JAgE. Startowany jest on automatycznie i nie wymaga żadnej konfiguracji.

Kolejki jako skrzynki odbiorcze

W każdym brokerze istnieją kolejki komunikatów jednoznacznie powiązane przez nazwę z danym komponentem platformy. Każdy komponent platformy JAgE posiada więc w obrębie swojego lokalnego brokera kolejkę komunikatów które zostały przesłane do niego.

JNDI

Ponieważ każdy broker jest lokalny dla jednego workplace'a, musi istnieć możliwość pozyskiwania referencji do zdalnych brokerów. Zapewni to usługa JNDI w której rejestrować się będzie każdy nowo powstały broker. Usługa JNDI jest niezależna od platformy JAgE, istnieje poza platformą i jest wspólna dla wszystkich węzłów sieci.

Komunikacja

Chcąc wysłać wiadomość na adres node:Component komponent komunikacyjny pobiera z JNDI zdalnego brokera, który jest identyfikowany przez adres noda i pobiera z niego kolejkę identyfikowaną przez nazwę Component. Następnie umieszcza wiadomość w pozyskanej kolejce. Mechanizm ActiveMQ automatycznie wywołuje wtedy metodę onMEssage() odpowiedniego komponentu na docelowym węźle

Attachments:

class-diargam.jpg (image/jpeg)
deployment-diagram.jpg (image/jpeg)
component-diagram.jpg (image/jpeg)
init.jpg (image/jpeg)
receive.jpg (image/jpeg)
send.jpg (image/jpeg)