Hello Crossbar team,
We decided to make our node in java, everything works fine, till client loses connection to server.
I’we seen the reconnect example in android example but I couldn’t recreate this for netty, so I wrote my own reconnecting mechanism but sometimes produces two sessions after connecting.
Here is the snippet of the code:
main.java:
package ledinek.control;
import java.io.IOException;
import ledinek.control.helpers.PropUtils;
import ledinek.control.ws.WebStatusModel;
import ledinek.control.ws.WsConnection;
import org.apache.log4j.Logger;public class Main {
static Logger logger = Logger.getLogger(Main.class);public static void main(String[] args) { logger.info("Entering application."); try { PropUtils.load(System.getenv("PROPERTIES")); } catch (IOException e) { e.printStackTrace(); } //BasicConfigurator.configure(); //logger.setLevel(Level.OFF); WebStatusModel webstatus = new WebStatusModel(); Object wssync = new Object(); String ws_url = PropUtils.properties.getProperty("websocket-url"); String ws_realm = PropUtils.properties.getProperty("websocket-realm"); WsConnection backendComm = new WsConnection(ws_url, ws_realm, webstatus, wssync); final Thread wsc = new Thread(backendComm); wsc.start(); }
}
WsConnection.java
package ledinek.control.ws;
import com.google.gson.Gson; import com.google.gson.JsonObject; import io.crossbar.autobahn.wamp.Client; import io.crossbar.autobahn.wamp.Session; import io.crossbar.autobahn.wamp.types.*; import java.util.concurrent.TimeUnit; import java.net.URI; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.logging.Logger; public class WsConnection implements Runnable { private static final Logger LOGGER = Logger.getLogger(WsConnection.class.getName()); private String url; private String realm; private CompletableFuture<ExitInfo> connection; private Session session = null; private final PublishOptions publishOptions = new PublishOptions(true, false); private boolean connected = false; private int retry_attempts = 0; private int retry_sleep = 0; private int sleep = 0; private WebStatusModel webstatus; private Gson gson = new Gson(); private URI uri; private Object wssync; private String prefix = "com.ledinek.control."; private Client client; public WsConnection(String url, String realm, WebStatusModel webstatus, Object wssync) { this.url = url; this.realm = realm; this.webstatus = webstatus; this.wssync = wssync; } @Override public void run() { while(true) { try { // reconnecting if (!connected){ if (retry_attempts == 0 ){ System.out.println("Connecting..."); } else { if (retry_sleep > 6){retry_sleep = 1;} sleep = retry_sleep * retry_sleep; System.out.println("Connection attempt " + retry_attempts + " sleeping for " + sleep + " seconds"); } retry_attempts++; retry_sleep++; System.out.println("Make new session"); this.session = null; this.session = new Session(); this.client = null; this.client = new Client(session, url, realm); this.session.addOnConnectListener(this::onConnect); this.session.addOnJoinListener(this::onJoin); this.session.addOnLeaveListener(this::onLeave); this.session.addOnDisconnectListener(this::onDisconnect); this.connection = null; this.connection = this.client.connect(); } if (!connected){ Thread.sleep(1000 * retry_sleep); } else { Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } /* * Websocket client */ private void onJoin(Session session, SessionDetails details) { this.session = session; this.retry_attempts = 0; this.retry_sleep=1; // Send to event_log LOGGER.info("Published restart to event_log"); this.publish("event_log", "error","Control system restarted."); /* * RPC registrations */ CompletableFuture<Registration> order_to_produce = session.register(prefix + "order_to_produce", this::order_to_produce); order_to_produce.thenAccept(reg -> LOGGER.info("Registered procedure: " + prefix + "order_to_produce")); /* * ADD SERVICE ON BACKEND COMPONENT */ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } CompletableFuture<CallResult> add_service = session.call("com.ledinek.backend.add_service", "control", session.getID()); add_service.thenAccept(callResult ->{ // System.out.println(callResult.results.get(0).getClass()); if (callResult.results.get(0).equals(true)){ LOGGER.info("Service successfully added to list of services"); } else { LOGGER.warning("Service is already added to list of online services. EXIT!"); } }).exceptionally(ex -> { System.out.println(ex.getMessage()); return null; }); } private void onConnect(Session session) { connected = true; retry_attempts = 1; LOGGER.info("Session connected, ID=" + session.getID()); } private void onLeave(Session session, CloseDetails detail) { connected = false; LOGGER.info(String.format("Left reason=%s, message=%s", detail.reason, detail.message)); } private void onDisconnect(Session session, boolean wasClean) { connected = false; System.out.println(String.format("Session with ID=%s, disconnected.", session.getID())); } public void publish(String topic, String type, String message){ CompletableFuture<Publication> p = session.publish(prefix + topic, type, message,System.currentTimeMillis() / 1000L, UUID.randomUUID().toString(), publishOptions); p.whenComplete((publication, throwable) -> { if (throwable != null) { // LOGGER.info(String.format("published to %s with message %s ", prefix + topic, message)); LOGGER.warning(String.format(" ERROR - published to %s with message %s - FAILED !", prefix + topic, message)); } }); } /* * RPC FUNCTIONS * ( you need to register them in function onJoin ) */ /** * @apiNote Call URI - com.ledinek.control.order_to_produce * * @param args - [ float maxMoistureLimit, float minMoistureLimit ] * @param details - call details * @return - Success / Fail * * curl -H "Content-Type: application/json" -d '{"procedure": "com.ledinek.control.order_to_produce": [0.12, 0.15]}' localhost:8080/caller */ //TODO [ rpc on db_component that calls this function ] private String order_to_produce(List<LinkedHashMap> args, InvocationDetails details) { /* frontend call const response = yield saveItem('/api/v1/orders/all/check_to_production/', 'POST', a.items.map((item) => item.id)); */ try { Order order = null; synchronized(this.wssync) { order = this.gson.fromJson(gson.toJson(args.get(0)), Order.class); System.out.println(order.id); } if (order != null) { System.out.println("Produce order"); } return "order_to_produce: True"; } catch (Exception e) { e.printStackTrace(); } return "order_to_produce: False"; } }
and terminal output:
Connecting…
Make new session
Init line started
Trying to connect to PLC: 10.10.12.10 Init line finished
Connection attempt 1 sleeping for 1 seconds
Make new session
Apr 13, 2018 11:43:49 AM ledinek.control.ws.WsConnection onConnect
INFO: Session connected, ID=0
Apr 13, 2018 11:43:49 AM ledinek.control.ws.WsConnection onConnect
INFO: Session connected, ID=0
…
Apr 13, 2018 11:44:54 AM io.crossbar.autobahn.wamp.transports.NettyWebSocketClientHandler
INFO: WebSocket Client disconnected!
Apr 13, 2018 11:44:54 AM io.crossbar.autobahn.wamp.transports.NettyWebSocketClientHandler
INFO: WebSocket Client disconnected!
Session with ID=7959461717222276, disconnected.
Session with ID=5698032712161101, disconnected.
Any idea how this could be fixed ?
Regards,
Marko