Tuesday, February 17, 2015

Two way TCP Transport WSO2 ESB

WSO2 ESB will be support two way TCP Transport in upcoming releases(currently it will be not available but generic support for there with the WSO2 ESB TCP Transport) which client can send and receive messages to the TCP Proxy through same TCP Connection.  In TCP transport, there will be need of determining the end of message which needs to be mediated through the ESB. So with the implementation ESB will support splitting message with character, sequence of characters, message length and special characters in hex form. There is a option that client can select which input type where client send the request to tcp proxy. For now the options available for the input type is binary and string. Splitting the message by single character will be the most efficient.


Available properties.


  • "transport.tcp.port" - TCP Port
  • "transport.tcp.contentType" - Input message content type
  • "transport.tcp.recordDelimiter" - record delimiter
  • "transport.tcp.recordDelimiterType" -type of delimiter (string, character, byte)
  • "transport.tcp.recordLength" - Length of message to splitted. If this is set then delimiter properties will be omitted
  • "transport.tcp.responseClient - Set if client need to get the response;
  • "transport.tcp.inputType" - Input type of message (string, binary)


Below shows sample proxy which split the message with character. It expected receive message with empty body it will be forward to http endpoint after enrich the body with the IBM symbol.


Requesting message


<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">""<soapenv:Header/><soapenv:Body/></soapenv:Envelope>"

 <proxy name="TCPProxy" 
          transports="tcp" 
          startOnLoad="true" 
          trace="disable"> 
      <description/> 
      <target> 
         <inSequence> 
            <property name="symbol" value="IBM" scope="default" type="STRING"/> 
            <enrich> 
               <source type="inline" clone="true"> 
                  <m:getQuote xmlns:m="http://services.samples"> 
                     <m:request> 
                        <m:symbol>?</m:symbol> 
                     </m:request> 
                  </m:getQuote> 
               </source> 
               <target type="body"/> 
            </enrich> 
            <enrich> 
               <source type="property" clone="true" property="symbol"/> 
               <target xmlns:m="http://services.samples" xpath="//m:getQuote/m:request/m:symbol"/> 
            </enrich> 
            <log level="full" separator=","/> 
            <send> 
               <endpoint> 
                  <address uri="http://localhost:9000/services/SimpleStockQuoteService" format="soap11"/> 
               </endpoint> 
            </send> 
         </inSequence> 
         <outSequence> 
            <log level="full"/> 
            <send/> 
         </outSequence> 
      </target> 
      <parameter name="transport.tcp.responseClient">true</parameter> 
      <parameter name="transport.tcp.recordDelimiter">|</parameter> 
      <parameter name="transport.tcp.inputType">string</parameter> 
      <parameter name="transport.tcp.port">6789</parameter> 
      <parameter name="transport.tcp.recordDelimiterType">character</parameter> 
      <parameter name="transport.tcp.contentType">text/xml</parameter> 
   </proxy>
Below shows splitting an input message with special charactor apend to end of the message.
<proxy name="TCPProxy" 
          transports="tcp" 
          startOnLoad="true" 
          trace="disable"> 
      <description/> 
      <target> 
         <inSequence> 
            <property name="symbol" value="IBM" scope="default" type="STRING"/> 
            <enrich> 
               <source type="inline" clone="true"> 
                  <m:getQuote xmlns:m="http://services.samples"> 
                     <m:request> 
                        <m:symbol>?</m:symbol> 
                     </m:request> 
                  </m:getQuote> 
               </source> 
               <target type="body"/> 
            </enrich> 
            <enrich> 
               <source type="property" clone="true" property="symbol"/> 
               <target xmlns:m="http://services.samples" xpath="//m:getQuote/m:request/m:symbol"/> 
            </enrich> 
            <log level="full" separator=","/> 
            <send> 
               <endpoint> 
                  <address uri="http://localhost:9000/services/SimpleStockQuoteService" format="soap11"/> 
               </endpoint> 
            </send> 
         </inSequence> 
         <outSequence> 
            <log level="full"/> 
            <send/> 
         </outSequence> 
      </target> 
      <parameter name="transport.tcp.recordDelimiter">0x03</parameter> 
      <parameter name="transport.tcp.responseClient">true</parameter> 
      <parameter name="transport.tcp.inputType">binary</parameter> 
      <parameter name="transport.tcp.port">6789</parameter> 
      <parameter name="transport.tcp.recordDelimiterType">byte</parameter> 
      <parameter name="transport.tcp.contentType">text/xml</parameter> 
   </proxy>
Below proxy shows splitting with the sequence of characters.
<proxy name="TCPProxy" 
          transports="tcp" 
          startOnLoad="true" 
          trace="disable"> 
      <description/>
<target> 

        <inSequence> 
            <property name="symbol" value="IBM" scope="default" type="STRING"/> 
            <enrich> 
               <source type="inline" clone="true"> 
                  <m:getQuote xmlns:m="http://services.samples"> 
                     <m:request> 
                        <m:symbol>?</m:symbol> 
                     </m:request> 
                  </m:getQuote> 
               </source> 
               <target type="body"/> 
            </enrich> 
            <enrich> 
               <source type="property" clone="true" property="symbol"/> 
               <target xmlns:m="http://services.samples" xpath="//m:getQuote/m:request/m:symbol"/> 
            </enrich> 
            <log level="full" separator=","/> 
            <send> 
               <endpoint> 
                  <address uri="http://localhost:9000/services/SimpleStockQuoteService" format="soap11"/> 
               </endpoint> 
            </send> 
         </inSequence> 
         <outSequence> 
            <log level="full"/> 
            <send/> 
         </outSequence> 
      </target>
<parameter name="transport.tcp.responseClient">true</parameter> 
      <parameter name="transport.tcp.recordDelimiter">split</parameter>
      <parameter name="transport.tcp.inputType">string</parameter> 
      <parameter name="transport.tcp.port">6789</parameter> 
      <parameter name="transport.tcp.recordDelimiterType">string</parameter> 
      <parameter name="transport.tcp.contentType">text/xml</parameter> 
 </proxy>
Sample Java Client for split with special character. (character delimiter client can be made easily which small changes to below client )

import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.net.Socket;
 
 public class TCPClient {
 
 String host = "localhost";
 int port = 6789;
 Socket socket = null;
 int count = 0;
 
 public static void main(String args[]) throws Exception {
 Character aByte = 0x10;
 TCPClient client = new TCPClient();
 String message = "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\">"
                 + "<soapenv:Header/><soapenv:Body/></soapenv:Envelope>" + aByte;
 client.sendToServer(message);
 client.recieveFromServer();
 client.sendToServer(message);
 client.recieveFromServer();
 client.close();
 }
 
 TCPClient() throws Exception {
 socket = new Socket(host, port);
 }
 
 void sendToServer(String msg) throws Exception {
 //create output stream attached to socket
 PrintWriter outToServer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
 //send msg to server
 outToServer.print(msg);
 outToServer.flush();
 }
 
 void recieveFromServer() throws Exception {
 char delimiter = 0x10;
 InputStream inFromServer = socket.getInputStream();
 //read from server
 int next = inFromServer.read();
 ByteArrayOutputStream bos = new ByteArrayOutputStream();
 while (next > -1) {
 if (delimiter != next) {
 bos.write(next);
 }
 next = inFromServer.read();
 if (delimiter == next) {
 System.out.println(new String(bos.toByteArray()));
 count++;
 if (count == 1 || count == 2) {
 break;
 }
 bos = new ByteArrayOutputStream();
 }
 }
 
 if (count == 2) {
 close();
 }
 }
 
 void close() throws IOException {
 socket.close();
 }
 }

17 comments:

  1. When I try send message by TCP i get exception: socket closed. Any idea what could be the cause?

    ERROR - TCPTransportSender Error while sending a TCP response
    java.net.SocketException: Socket is closed
    at java.net.Socket.getOutputStream(Socket.java:912)
    at org.apache.axis2.transport.tcp.TCPTransportSender.sendMessage(TCPTransportSender.java:67)
    at org.apache.axis2.transport.base.AbstractTransportSender.invoke(AbstractTransportSender.java:119)
    at org.apache.axis2.engine.AxisEngine.send(AxisEngine.java:442)
    at org.apache.synapse.core.axis2.Axis2Sender.sendBack(Axis2Sender.java:163)
    at org.apache.synapse.core.axis2.Axis2SynapseEnvironment.send(Axis2SynapseEnvironment.java:321)
    at org.apache.synapse.mediators.builtin.SendMediator.mediate(SendMediator.java:94)
    at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:77)
    at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:47)
    at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:131)
    at org.apache.synapse.core.axis2.Axis2SynapseEnvironment.injectMessage(Axis2SynapseEnvironment.java:268)
    at org.apache.synapse.core.axis2.SynapseCallbackReceiver.handleMessage(SynapseCallbackReceiver.java:488)
    at org.apache.synapse.core.axis2.SynapseCallbackReceiver.receive(SynapseCallbackReceiver.java:170)
    at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
    at org.apache.synapse.transport.passthru.ClientWorker.run(ClientWorker.java:225)
    at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

    ReplyDelete
  2. Did you try this feature? This is not available yet. :)

    ReplyDelete
    Replies
    1. Sorry, I don't get you. What feature do you mean exactly?

      Delete
    2. When do you get this error? :)

      Delete
    3. I call proxy by java client or telnet. In flow go perfect, I send request to mock service, I get response from mock and when I try send this response to client i get this error.

      this is my proxy:

      <proxy xmlns="http://ws.apache.org/ns/synapse" name="testTCP" transports="http https tcp" startOnLoad="true" trace="disable">
      <target>
      <inSequence>
      <log level="full"/>
      <property name="ClientApiNonBlocking" scope="axis2" action="remove"/>
      <send>
      <endpoint>
      <address uri="http://localhost:8099/mockitcard"/>
      </endpoint>
      </send>
      </inSequence>
      <outSequence>
      <send/>
      </outSequence>
      <faultSequence/>
      </target>
      <parameter name="transport.tcp.responseClient">true</parameter>
      <parameter name="transport.tcp.port">6789</parameter>
      </proxy>

      Delete
  3. This comment has been removed by the author.

    ReplyDelete
  4. Is it not getting any response or is the client get response only once? Current implementation. we are closing the socket after the response is send back to the client.

    ReplyDelete
  5. Client don't get any response. Error is on send mediator in outSequence.

    ReplyDelete
  6. Yeap, it because the socket is closed when it going to write back to client. With the current implementation, you can only send 1 message from the client. Did your client close the connection after send the message?

    ReplyDelete
  7. No, my client doesn't close the connection after send message. Socket is open and client wait for response from ESB.

    ReplyDelete
  8. If that so.you should be able tto get the first response. Can you post your client code?

    ReplyDelete
  9. import java.io.DataInputStream;
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.OutputStreamWriter;
    import java.io.PrintWriter;
    import java.net.Socket;
    import java.net.UnknownHostException;


    public class Client {

    private final String HOST = "localhost";
    private final int PORT = 6789;
    private Socket socket = null;

    public Client() throws UnknownHostException, IOException{
    socket = new Socket(HOST, PORT);
    }

    public void sendMessage(String msg) throws IOException{
    DataOutputStream outToServer = new DataOutputStream(
    socket.getOutputStream());

    outToServer.writeBytes(msg);
    outToServer.flush();
    }

    public void receiveFromServer() throws IOException, ClassNotFoundException{
    ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
    byte[] receivedMessage = (byte[]) ois.readObject();
    }

    public void close(){
    try {
    socket.close();
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }

    public static void main(String args[]){
    try {
    Client client = new Client();
    String msg = ""
    + "";
    client.sendMessage(msg);
    client.receiveFromServer();
    client.close();
    } catch (UnknownHostException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    } catch (ClassNotFoundException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    }

    In my client I get exception after outToServer.writeBytes(msg);
    When I try your client exception is after outToServer.flush();

    ReplyDelete
    Replies
    1. msg = "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\"> "
      + "<soapenv:Header/> <soapenv:Body/> </soapenv:Envelope> ";

      Delete
  10. Hi Greg,

    I tried with the sample. So basically what happen there is server closes the connection just after the message is sent and when client read the connection it closes. So the above feature is eliminate that problem. I can send you a jar to try out if you needed. :)

    ReplyDelete
  11. That will be awesome :) I'll send you a message on Google +

    ReplyDelete
  12. great you can reach me harsz89@gmail.com

    ReplyDelete