Dokumentacja dla CommunicationService
dalej nazywanego w skrócie CS. Pojedynczą instancję programu wykorzystującego CS nazywamy "węzłem".
Instalacja/Uruchomienie
Wymagania
Sieć - CS przystosowane jest do pracy w sieci prywatnej, (PAT/NAT uniemożliwiają poprawne działanie), jakiekolwiek rutery pomiędzy węzłami powinny forwardować pakiety grupy multicastowej (patrz konfiguracja) 1.1.2 RMI - CS do poprawnego działania wymaga rmiregistry. Warunkiem koniecznym do kontaktowania się dwóch węzłów jest to czy znają położenie remregistry z którym jest powiązany NodeConnector danego węzła. Są trzy sposoby konfiguracji współpracy CS z rmiregistry:
- Globalne rmiregistry - global
Użytkownik musi uruchomić jedno rmiregisty, jego adres i port jest podawany w konfiguracji i wszystkie węzły z niego korzystają. Należy pamiętać o tym żeby rmiregisty miało w CLASSPATH dostęp do plików .class pakietów org.jage.platform.communication2.communicationService.* - Rmiregistry na każdy węzeł - one-to-one
Każdy węzeł uruchamia i korzysta z własnego rmiregisty, nie wymaga od użytkownika dodatkowych działań. Należy pamiętać że tryb ten wymaga włączonego rozgłaszania na wszystkich węzłach - Mieszany
Można mieszać dwa powyższe sposoby. Np. n węzłów działa w trybie one-to-one a k węzłów używa jednego rmiregistry, Jeżeli węzły pracujące w trybie global nie mają wyłączonego_' rozgłaszania, wszystkie węzły na starcie będą miały pełną informację o środowisku. W przeciwnym razie (gdy węzły w trybie global mają '_wyłączone rozgłaszanie ) nie ma pewności, że pozostałe węzły dowiedzą się o węzłach z grupy global.
Instalacja
Instalacja oprócz podania w CLASSPATH jar-a z CS, wymaga dodania wpisu w konfiguracji uruchomieniowej węzła. Najprostsza postać konfiguracji
<component name="CommunicationService" class="org.jage.platform.communication2.CommunicationService" isSingleton="true" />
Uruchomienie
Proces uruchamiania przeprowadzany jest przez funkcję init(), wywoływaną przez szynę platformy jAgE. składa się z następujących faz:
- inicjalizacja - inicjalizowane są wewnętrzne struktury, uruchamiany jest wątek CS
- rejestrowanie - zarejestrowanie Connectora w rmiregistry, od tej pory węzeł jest widoczny na zewnątrz, może przyjmować wiadomości. Nie ma pewności czy komponent do którego ma trafić wiadomość w tej i następnych fazach został już zainicjalizowany funkcja init(), może to doprowadzić do błędu.
- rozgłaszanie - węzeł wysyła wiadomości ,informując inne węzły o swoim istnieniu, poprzez UDP na wskazany w konfiguracji port i grupę multicastową. Jeżeli pracują już jakieś węzły przekażą nowemu informacje o wszystkich znanym im węzłach. Należy zadbać o to żeby wszystkie węzły miały ustawiony ten sam port i grupę multicastową dla rozgłaszania, w przeciwnym razie węzły nie dowiedzą się o swoim wzajemnym istnieniu. W przypadku konfiguracji z globalnym rmiregistry fazę rozgłaszania można pominąć ustawiając 0 dla broadcastCount,
Uruchomiony CS nasłuchuję na okoliczność pojawienia się nowych węzłów, odbiera wiadomości od innych węzłów i przekazuję je do klientów. Wiadomości do wysłania agreguję w paczki, które po spełnieniu kryterium są wysyłane do docelowego węzła a tam dostarczane do odbiorców. W przypadku stwierdzenia śmierci węzła - wszystkie wiadomości oczekujące na wysłanie do tego węzła są usuwane.
Konfiguracja
Dostępna jest konfiguracja podstawowych funkcjonalności CS, konfiguracji dokonujemy za pośrednictwem tagów "property" np.
<component name="CommunicationService" class="org.jage.platform.communication2.CommunicationService" isSingleton="true" > <property name="host"> <value class="String" value=""/> </property> <property name="port"> <value class="Integer" value="12000"/> </property> </component>
Zestawienie opcji konfiguracyjnych
name |
class |
opis |
wartość domyślna |
---|---|---|---|
host |
String |
Nazwa hosta na którym jest uruchomiony rmiregisty, może być nazwa jak i adres ipv4, podanie pustego Stringu ("") powoduje uruchomienie własnego rmiregistry |
"" - pusty string |
port |
Integer |
port na którym nasłuchuje rmiregistry, w przypadku uruchamiania własnego rmiregistry jest to port na którym to rmiregisty będzie nasłuchiwać, podanie 0 powoduje przydzielenie wolnego portu |
0 |
broadcastPort |
Integer |
port na którym jest ogłaszany nowo uruchomiony węzeł |
5005 |
broadcastGroup |
String |
adres ipv4 grupy multicastowej na który ogłasza się nowo uruchomiony węzeł |
230.0.0.1 |
broadcastCount |
Integer |
liczba pakietów wysyłanych podczas fazy rozgłaszania przez nowo uruchomiony węzeł, podanie 0 powoduje pominięcie tej fazy |
3 |
broadcastDelay |
Integer |
odstęp w czasie w ms pomiędzy pakietami podczas rozgłaszania |
3000 |
maxConnectionFail |
Integer |
maksymalna liczba nieudanych prób komunikacji z zdalnym węzłem, po której węzeł jest uznawany za martwy |
5 |
minPayloadSize |
Integer |
minimalna wielkość zagregowanych wiadomości, powyżej której wiadomości są wysyłane( w bajtach) |
1048576 |
Użycie
SC dostarcza funkcjonalności umożliwiających wysyłanie i odbieranie wiadomości do komponentów znajdujących się na innych węzłach
Pozyskanie referencji na CS
Referencje można pobrać za pomocą funkcji getInstance(String name) z interfejsu IComponentInstanceProvider, podając name zgodne z tym w pliku konfiguracyjnym. Przykład:
// age.xml <configuration> <component name="CommunicationService" class="org.jage.platform.communication2.CommunicationService" isSingleton="true" /> </configuration> // SomeComponent.java class SomeComponent implements IPlatformComponent{ ICommunicationService service; public void setComponentEnvironment(IComponentEnvironment env) { service =(ICommunicationService) env.getInstance("CommunicationService"); } ... }
Wysłanie wiadomości.
Wysyłanie odbywa się poprzez metodę send interfejsu ICommunicationService.
Użytkownik musi dostarczyć wiadomość składającą się z :
- Nagłówka - przechowuje informację o odbiorcy i nadawcy wiadomości. Poprawny nagłówek musi zwracać componentId nadawcy i selektor dostarczający niepustą kolekcję odbiorców
- Odbiorca wiadomości: componentId odbiorcy jest pobierany z adresu przechowywanego w selektorze, jeśli nie jest on tam ustawiony (== null) to componentId odbiorcy jest taki sam jak componentId nadawcy. Selector jest inicjowany przez CS kolekcją wszystkich znanych mu działających węzłów (argument allAddresses metody initialize, jako drugi argument podawany jest
null
). Kolekcja ta ma formę zbiory IComponentAddress w którym componentId jest null. Wszelkie błędy rzucane podczas inicjowania selektora są ignorowane. - Nadawca wiadomości: pole nodeAddres nie jest używane. Wraz CS dostarczone są implementację nagłówków:
- UnicastHeader - wysyła wiadomość do konkretnego komponentu na konkretnym węźle. Konstruktor przyjmuję Stringa będącego componentId nadawcy i jednocześnie odbiorcy i obiekt implementujący INodeAddress który wskazuję węzeł odbiorcy (Uwaga: Drugi konstruktor umożliwia ustawienie innego componentId odbiorcy niż componentId nadawcy)
- Odbiorca wiadomości: componentId odbiorcy jest pobierany z adresu przechowywanego w selektorze, jeśli nie jest on tam ustawiony (== null) to componentId odbiorcy jest taki sam jak componentId nadawcy. Selector jest inicjowany przez CS kolekcją wszystkich znanych mu działających węzłów (argument allAddresses metody initialize, jako drugi argument podawany jest
- Treści - informacji właściwej którą komponent chcę przesłać, w tej implementacji wymagane jest aby obiekt ten implementował interfejs Serializable. Jakiekolwiek błędy w nagłówku lub nie implementowanie Serializable przez "treść" powoduje nie wysłanie wiadomości, błąd nie jest zgłaszany komponentowi wysyłającemu, błąd jest natomiast logowany.
Przykład użycia:
... IICommunicationService cs; //uzyskanie cs ... // kompletowanie nagłówka INodeAddres addr = new NodeAddress("znany_nam_adres_węzła"); IHeader<IComponentAddress> header = new UnicastHeader("znany_nam_component_id", addr); // wiadomość przesyłająca Stringa do komponentu :znany_nam_component_id na węźle znany_nam_adres_węzła Message<IComponentAddress> message; message = new Message<IComponentAddress>(header, new String("hello world")); //wysłanie cs.send(message);
BroadcastHeader - wysyła wiadomość do konkretnego komponentu na wszystkich węzłach znanym danemu CS (oprócz jego samego) Konstruktor przyjmuję Stringa będącego componentId nadawcy i odbiorcy wiadomości.
Przykład użycia:
... IICommunicationService cs; //uzyskanie cs ... // nagłówek IHeader<IComponentAddress> header = new BroadcastHeader("znany_nam_component_id"); // wiadomość przesyłająca Stringa do komponentu :znany_nam_component_id na wszystkich węzłach, znanych używanemu CS Message<IComponentAddress> message; message = new Message<IComponentAddress>(header, new String("hello world")); //wysłanie cs.send(message);
Odbieranie wiadomości.
Odbierać wiadomości można na dwa sposoby:
- push - CS odszukuję dany componentId za pomocą getInstance() z interfejsu IComponentInstanceProvider, jeżeli znaleziony komponent implementuje interfejs
zostanie wywołana metoda deliver przekazując wiadomość. W przeciwnym razie wiadomość zostanie zapamiętana i może zostać odebrana metodą pull
public interface IMessageListener<A extends IAddress>{ void deliver(Message<A> message); }
- pull - klient może odebrać wiadomość bezpośrednio z CS, wywołując metodę receive() podając swój compondntId. Metoda ta zwraca najstarszą nieodebraną dotąd wiadomość dla danego komponentu lub null jeśli nie ma żadnej wiadomości. Dostarczone wiadomości powoduję usunięcie jej bufora CS.
Odebrana wiadomość nie przechowuję informacji o odbiorcach nadanej podczas wysyłania. Czyli nie dowiemy się z nagłówka odebranej wiadomości do jakich jeszcze innych komponentów/węzłów została wysłana. Pole odbiorcy jest unicastowym selektorem zawierającym ComponentAddres wskazujący na componentId i adres węzła odbiorcy.