AgE 2.5 : Communication based on RMI - User documentation

Dokumentacja dla CommunicationService dalej nazywanego w skrócie CS. Pojedynczą instancję programu wykorzystującego CS nazywamy "węzłem".

Instalacja/Uruchomienie

Wymagania

Sieć - CS przystosowane jest do pracy w sieci prywatnej, (PAT/NAT uniemożliwiają poprawne działanie), jakiekolwiek rutery pomiędzy węzłami powinny forwardować pakiety grupy multicastowej (patrz konfiguracja) 1.1.2 RMI - CS do poprawnego działania wymaga rmiregistry. Warunkiem koniecznym do kontaktowania się dwóch węzłów jest to czy znają położenie remregistry z którym jest powiązany NodeConnector danego węzła. Są trzy sposoby konfiguracji współpracy CS z rmiregistry:

  1. Globalne rmiregistry - global
    Użytkownik musi uruchomić jedno rmiregisty, jego adres i port jest podawany w konfiguracji i wszystkie węzły z niego korzystają. Należy pamiętać o tym żeby rmiregisty miało w CLASSPATH dostęp do plików .class pakietów org.jage.platform.communication2.communicationService.*
  2. Rmiregistry na każdy węzeł - one-to-one
    Każdy węzeł uruchamia i korzysta z własnego rmiregisty, nie wymaga od użytkownika dodatkowych działań. Należy pamiętać że tryb ten wymaga włączonego rozgłaszania na wszystkich węzłach
  3. Mieszany
    Można mieszać dwa powyższe sposoby. Np. n węzłów działa w trybie one-to-one a k węzłów używa jednego rmiregistry, Jeżeli węzły pracujące w trybie global nie mają wyłączonego_' rozgłaszania, wszystkie węzły na starcie będą miały pełną informację o środowisku. W przeciwnym razie (gdy węzły w trybie global mają '_wyłączone rozgłaszanie ) nie ma pewności, że pozostałe węzły dowiedzą się o węzłach z grupy global.

Instalacja

Instalacja oprócz podania w CLASSPATH jar-a z CS, wymaga dodania wpisu w konfiguracji uruchomieniowej węzła. Najprostsza postać konfiguracji

  <component name="CommunicationService" class="org.jage.platform.communication2.CommunicationService"
		isSingleton="true" />

Uruchomienie

Proces uruchamiania przeprowadzany jest przez funkcję init(), wywoływaną przez szynę platformy jAgE. składa się z następujących faz:

  • inicjalizacja - inicjalizowane są wewnętrzne struktury, uruchamiany jest wątek CS
  • rejestrowanie - zarejestrowanie Connectora w rmiregistry, od tej pory węzeł jest widoczny na zewnątrz, może przyjmować wiadomości. Nie ma pewności czy komponent do którego ma trafić wiadomość w tej i następnych fazach został już zainicjalizowany funkcja init(), może to doprowadzić do błędu.
  • rozgłaszanie - węzeł wysyła wiadomości ,informując inne węzły o swoim istnieniu, poprzez UDP na wskazany w konfiguracji port i grupę multicastową. Jeżeli pracują już jakieś węzły przekażą nowemu informacje o wszystkich znanym im węzłach. Należy zadbać o to żeby wszystkie węzły miały ustawiony ten sam port i grupę multicastową dla rozgłaszania, w przeciwnym razie węzły nie dowiedzą się o swoim wzajemnym istnieniu. W przypadku konfiguracji z globalnym rmiregistry fazę rozgłaszania można pominąć ustawiając 0 dla broadcastCount,

Uruchomiony CS nasłuchuję na okoliczność pojawienia się nowych węzłów, odbiera wiadomości od innych węzłów i przekazuję je do klientów. Wiadomości do wysłania agreguję w paczki, które po spełnieniu kryterium są wysyłane do docelowego węzła a tam dostarczane do odbiorców. W przypadku stwierdzenia śmierci węzła - wszystkie wiadomości oczekujące na wysłanie do tego węzła są usuwane.

Konfiguracja

Dostępna jest konfiguracja podstawowych funkcjonalności CS, konfiguracji dokonujemy za pośrednictwem tagów "property" np.

<component name="CommunicationService" class="org.jage.platform.communication2.CommunicationService"
		isSingleton="true" >
		
		<property name="host">
			<value class="String" value=""/>
		</property>
		
		<property name="port">
			<value class="Integer" value="12000"/>
		</property>	
</component>

Zestawienie opcji konfiguracyjnych

name

class

opis

wartość domyślna

host

String

Nazwa hosta na którym jest uruchomiony rmiregisty, może być nazwa jak i adres ipv4, podanie pustego Stringu ("") powoduje uruchomienie własnego rmiregistry

"" - pusty string

port

Integer

port na którym nasłuchuje rmiregistry, w przypadku uruchamiania własnego rmiregistry jest to port na którym to rmiregisty będzie nasłuchiwać, podanie 0 powoduje przydzielenie wolnego portu

0

broadcastPort

Integer

port na którym jest ogłaszany nowo uruchomiony węzeł

5005

broadcastGroup

String

adres ipv4 grupy multicastowej na który ogłasza się nowo uruchomiony węzeł

230.0.0.1

broadcastCount

Integer

liczba pakietów wysyłanych podczas fazy rozgłaszania przez nowo uruchomiony węzeł, podanie 0 powoduje pominięcie tej fazy

3

broadcastDelay

Integer

odstęp w czasie w ms pomiędzy pakietami podczas rozgłaszania

3000

maxConnectionFail

Integer

maksymalna liczba nieudanych prób komunikacji z zdalnym węzłem, po której węzeł jest uznawany za martwy

5

minPayloadSize

Integer

minimalna wielkość zagregowanych wiadomości, powyżej której wiadomości są wysyłane( w bajtach)

1048576

Użycie

SC dostarcza funkcjonalności umożliwiających wysyłanie i odbieranie wiadomości do komponentów znajdujących się na innych węzłach

Pozyskanie referencji na CS

Referencje można pobrać za pomocą funkcji getInstance(String name) z interfejsu IComponentInstanceProvider, podając name zgodne z tym w pliku konfiguracyjnym. Przykład:

  // age.xml
  <configuration>
    <component name="CommunicationService" class="org.jage.platform.communication2.CommunicationService"
		isSingleton="true" />
  </configuration>

  // SomeComponent.java
  class SomeComponent implements IPlatformComponent{
    ICommunicationService service;

    public void setComponentEnvironment(IComponentEnvironment env) {
	service =(ICommunicationService) env.getInstance("CommunicationService");	
    }
    
    ...
 }

Wysłanie wiadomości.

Wysyłanie odbywa się poprzez metodę send interfejsu ICommunicationService.

Użytkownik musi dostarczyć wiadomość składającą się z :

  • Nagłówka - przechowuje informację o odbiorcy i nadawcy wiadomości. Poprawny nagłówek musi zwracać componentId nadawcy i selektor dostarczający niepustą kolekcję odbiorców
    • Odbiorca wiadomości: componentId odbiorcy jest pobierany z adresu przechowywanego w selektorze, jeśli nie jest on tam ustawiony (== null) to componentId odbiorcy jest taki sam jak componentId nadawcy. Selector jest inicjowany przez CS kolekcją wszystkich znanych mu działających węzłów (argument allAddresses metody initialize, jako drugi argument podawany jest null). Kolekcja ta ma formę zbiory IComponentAddress w którym componentId jest null. Wszelkie błędy rzucane podczas inicjowania selektora są ignorowane.
    • Nadawca wiadomości: pole nodeAddres nie jest używane. Wraz CS dostarczone są implementację nagłówków:
    • UnicastHeader - wysyła wiadomość do konkretnego komponentu na konkretnym węźle. Konstruktor przyjmuję Stringa będącego componentId nadawcy i jednocześnie odbiorcy i obiekt implementujący INodeAddress który wskazuję węzeł odbiorcy (Uwaga: Drugi konstruktor umożliwia ustawienie innego componentId odbiorcy niż componentId nadawcy)
  • Treści - informacji właściwej którą komponent chcę przesłać, w tej implementacji wymagane jest aby obiekt ten implementował interfejs Serializable. Jakiekolwiek błędy w nagłówku lub nie implementowanie Serializable przez "treść" powoduje nie wysłanie wiadomości, błąd nie jest zgłaszany komponentowi wysyłającemu, błąd jest natomiast logowany.

Przykład użycia:

  ...
  IICommunicationService cs;
  
  //uzyskanie cs
  ...

  // kompletowanie nagłówka
  INodeAddres addr = new NodeAddress("znany_nam_adres_węzła");
  IHeader<IComponentAddress> header = new UnicastHeader("znany_nam_component_id", addr);

  // wiadomość przesyłająca Stringa do komponentu :znany_nam_component_id na węźle znany_nam_adres_węzła
  Message<IComponentAddress> message;
  message = new Message<IComponentAddress>(header, new String("hello world"));
  //wysłanie
  cs.send(message);

BroadcastHeader - wysyła wiadomość do konkretnego komponentu na wszystkich węzłach znanym danemu CS (oprócz jego samego) Konstruktor przyjmuję Stringa będącego componentId nadawcy i odbiorcy wiadomości.

Przykład użycia:

  ...
  IICommunicationService cs;
  
  //uzyskanie cs
  ...

  // nagłówek
  IHeader<IComponentAddress> header = new BroadcastHeader("znany_nam_component_id");

  // wiadomość przesyłająca Stringa do komponentu :znany_nam_component_id na wszystkich węzłach, znanych używanemu CS
  Message<IComponentAddress> message;
  message = new Message<IComponentAddress>(header, new String("hello world"));
  //wysłanie
  cs.send(message);

Odbieranie wiadomości.

Odbierać wiadomości można na dwa sposoby:

  1. push - CS odszukuję dany componentId za pomocą getInstance() z interfejsu IComponentInstanceProvider, jeżeli znaleziony komponent implementuje interfejs
    public interface IMessageListener<A extends IAddress>{
      void deliver(Message<A> message);
    }
    
    zostanie wywołana metoda deliver przekazując wiadomość. W przeciwnym razie wiadomość zostanie zapamiętana i może zostać odebrana metodą pull
  2. pull - klient może odebrać wiadomość bezpośrednio z CS, wywołując metodę receive() podając swój compondntId. Metoda ta zwraca najstarszą nieodebraną dotąd wiadomość dla danego komponentu lub null jeśli nie ma żadnej wiadomości. Dostarczone wiadomości powoduję usunięcie jej bufora CS.
    Odebrana wiadomość nie przechowuję informacji o odbiorcach nadanej podczas wysyłania. Czyli nie dowiemy się z nagłówka odebranej wiadomości do jakich jeszcze innych komponentów/węzłów została wysłana. Pole odbiorcy jest unicastowym selektorem zawierającym ComponentAddres wskazujący na componentId i adres węzła odbiorcy.

Attachments:

rmi-one-to-one.png (image/png)
rmi-global.png (image/png)
api.png (image/png)