Newer Version Available

This content describes an older version of this product. View Latest

Create a Java Client

Create a Java client that uses Bayeux and CometD to subscribe to the channel.
  1. Download and install the CometD and Jetty .jar files if necessary.
  2. In a new Java project, add the following code to a Java source file named StreamingClientExample.java. This code subscribes to the Streaming channel you created and listens for notifications. Depending on your Java development environment, you might have to rename this file and class to Main.
    1swfobject.registerObject("clippy.codeblock-0", "9");
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17package demo;
    18
    19import org.cometd.bayeux.Channel;
    20import org.cometd.bayeux.Message;
    21import org.cometd.bayeux.client.ClientSessionChannel;
    22import org.cometd.bayeux.client.ClientSessionChannel.MessageListener;
    23import org.cometd.client.BayeuxClient;
    24import org.cometd.client.transport.ClientTransport;
    25import org.cometd.client.transport.LongPollingTransport;
    26
    27import org.eclipse.jetty.client.ContentExchange;
    28import org.eclipse.jetty.client.HttpClient;
    29
    30import java.net.MalformedURLException;
    31import java.net.URL;
    32import java.util.HashMap;
    33import java.util.Map;
    34
    35/**
    36 * This example demonstrates how a streaming client works 
    37 * against the Salesforce Streaming API with generic notifications.
    38 **/
    39
    40public class StreamingClientExample {
    41
    42    // This URL is used only for logging in. The LoginResult 
    43    // returns a serverUrl which is then used for constructing
    44    // the streaming URL. The serverUrl points to the endpoint 
    45    // where your organization is hosted.
    46
    47    static final String LOGIN_ENDPOINT = "https://login.salesforce.com";
    48    private static final String USER_NAME = "change_this_to_your_testuser@yourcompany.com";
    49    private static final String PASSWORD = "change_this_to_your_testpassword"; 
    50    // NOTE: Putting passwords in code is not a good practice and not recommended.
    51
    52    // The channel to subscribe to. 
    53    // Be sure to create the StreamingChannel before running this sample.
    54    private static final String CHANNEL = "/u/notifications/ExampleUserChannel";
    55    private static final String STREAMING_ENDPOINT_URI = "/cometd/34.0";
    56
    57    // The long poll duration.
    58    private static final int CONNECTION_TIMEOUT = 20 * 1000;  // milliseconds
    59    private static final int READ_TIMEOUT = 120 * 1000; // milliseconds
    60
    61    public static void main(String[] args) throws Exception {
    62
    63    	System.out.println("Running streaming client example....");
    64
    65        final BayeuxClient client = makeClient();
    66        client.getChannel(Channel.META_HANDSHAKE).addListener
    67            (new ClientSessionChannel.MessageListener() {
    68
    69            public void onMessage(ClientSessionChannel channel, Message message) {
    70
    71            	System.out.println("[CHANNEL:META_HANDSHAKE]: " + message);
    72
    73                boolean success = message.isSuccessful();
    74                if (!success) {
    75                    String error = (String) message.get("error");
    76                    if (error != null) {
    77                        System.out.println("Error during HANDSHAKE: " + error);
    78                        System.out.println("Exiting...");
    79                        System.exit(1);
    80                    }
    81
    82                    Exception exception = (Exception) message.get("exception");
    83                    if (exception != null) {
    84                        System.out.println("Exception during HANDSHAKE: ");
    85                        exception.printStackTrace();
    86                        System.out.println("Exiting...");
    87                        System.exit(1);
    88
    89                    }
    90                }
    91            }
    92
    93        });
    94
    95        client.getChannel(Channel.META_CONNECT).addListener(
    96            new ClientSessionChannel.MessageListener() {
    97            public void onMessage(ClientSessionChannel channel, Message message) {
    98
    99                System.out.println("[CHANNEL:META_CONNECT]: " + message);
    100
    101                boolean success = message.isSuccessful();
    102                if (!success) {
    103                    String error = (String) message.get("error");
    104                    if (error != null) {
    105                        System.out.println("Error during CONNECT: " + error);
    106                        System.out.println("Exiting...");
    107                        System.exit(1);
    108                    }
    109                }
    110            }
    111
    112        });
    113
    114        client.getChannel(Channel.META_SUBSCRIBE).addListener(
    115            new ClientSessionChannel.MessageListener() {
    116
    117            public void onMessage(ClientSessionChannel channel, Message message) {
    118
    119            	System.out.println("[CHANNEL:META_SUBSCRIBE]: " + message);
    120                boolean success = message.isSuccessful();
    121                if (!success) {
    122                    String error = (String) message.get("error");
    123                    if (error != null) {
    124                        System.out.println("Error during SUBSCRIBE: " + error);
    125                        System.out.println("Exiting...");
    126                        System.exit(1);
    127                    }
    128                }
    129            }
    130        });
    131
    132
    133
    134        client.handshake();
    135        System.out.println("Waiting for handshake");
    136        
    137        boolean handshaken = client.waitFor(10 * 1000, BayeuxClient.State.CONNECTED);
    138        if (!handshaken) {
    139            System.out.println("Failed to handshake: " + client);
    140            System.exit(1);
    141        }
    142
    143
    144
    145        System.out.println("Subscribing for channel: " + CHANNEL);
    146
    147        client.getChannel(CHANNEL).subscribe(new MessageListener() {
    148            @Override
    149            public void onMessage(ClientSessionChannel channel, Message message) {
    150                System.out.println("Received Message: " + message);
    151            }
    152        });
    153
    154        System.out.println("Waiting for streamed data from your organization ...");
    155        while (true) {
    156            // This infinite loop is for demo only,
    157            // to receive streamed events on the 
    158            // specified topic from your organization.
    159        }
    160    }
    161
    162
    163
    164    private static BayeuxClient makeClient() throws Exception {
    165        HttpClient httpClient = new HttpClient();
    166        httpClient.setConnectTimeout(CONNECTION_TIMEOUT);
    167        httpClient.setTimeout(READ_TIMEOUT);
    168        httpClient.start();
    169
    170        String[] pair = SoapLoginUtil.login(httpClient, USER_NAME, PASSWORD);
    171        
    172        if (pair == null) {
    173            System.exit(1);
    174        }
    175
    176        assert pair.length == 2;
    177        final String sessionid = pair[0];
    178        String endpoint = pair[1];
    179        System.out.println("Login successful!\nEndpoint: " + endpoint 
    180            + "\nSessionid=" + sessionid);
    181
    182        Map<String, Object> options = new HashMap<String, Object>();
    183        options.put(ClientTransport.TIMEOUT_OPTION, READ_TIMEOUT);
    184        LongPollingTransport transport = new LongPollingTransport(
    185          options, httpClient) {
    186
    187        	@Override
    188            protected void customize(ContentExchange exchange) {
    189                super.customize(exchange);
    190                exchange.addRequestHeader("Authorization", "OAuth " + sessionid);
    191            }
    192        };
    193
    194        BayeuxClient client = new BayeuxClient(salesforceStreamingEndpoint(
    195            endpoint), transport);
    196        return client;
    197    }
    198
    199    private static String salesforceStreamingEndpoint(String endpoint)
    200        throws MalformedURLException {
    201        return new URL(endpoint + STREAMING_ENDPOINT_URI).toExternalForm();
    202    }
    203
    204}
    205
    206
  3. Edit StreamingClientExample.java and modify the following values:
    File Name Static Resource Name
    USER_NAME Username of the logged-in user
    PASSWORD Password for the USER_NAME (or logged-in user)
    CHANNEL /u/notifications/ExampleUserChannel
    LOGIN_ENDPOINT https://test.salesforce.com (Only if you are using a sandbox. If you are in a production organization, no change is required for LOGIN_ENDPOINT.)
  4. Add the following code to a Java source file named SoapLoginUtil.java. This code sends a username and password to the server and receives the session ID.

    Never handle the usernames and passwords of others. Before using in a production environment, delegate the login to OAuth.

    Important

    1swfobject.registerObject("clippy.codeblock-1", "9");
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17package demo;
    18
    19import java.io.ByteArrayInputStream;
    20import java.io.IOException;
    21import java.io.UnsupportedEncodingException;
    22import java.net.MalformedURLException;
    23import java.net.URL;
    24
    25import org.eclipse.jetty.client.ContentExchange;
    26import org.eclipse.jetty.client.HttpClient;
    27import org.xml.sax.Attributes;
    28import org.xml.sax.SAXException;
    29import org.xml.sax.helpers.DefaultHandler;
    30
    31import javax.xml.parsers.ParserConfigurationException;
    32import javax.xml.parsers.SAXParser;
    33import javax.xml.parsers.SAXParserFactory;
    34
    35public final class SoapLoginUtil {
    36
    37    // The enterprise SOAP API endpoint used for the login call in this example.
    38    private static final String SERVICES_SOAP_PARTNER_ENDPOINT = "/services/Soap/u/22.0/";
    39
    40    private static final String ENV_START =
    41            "<soapenv:Envelope xmlns:soapenv='http://schemas.xmlsoap.org/soap/envelope/' "
    42                    + "xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' " +
    43                    "xmlns:urn='urn:partner.soap.sforce.com'><soapenv:Body>";
    44
    45    private static final String ENV_END = "</soapenv:Body></soapenv:Envelope>";
    46
    47    private static byte[] soapXmlForLogin(String username, String password) 
    48        throws UnsupportedEncodingException {
    49        return (ENV_START +
    50                "  <urn:login>" +
    51                "    <urn:username>" + username + "</urn:username>" +
    52                "    <urn:password>" + password + "</urn:password>" +
    53                "  </urn:login>" +
    54                ENV_END).getBytes("UTF-8");
    55    }
    56
    57    public static String[] login(HttpClient client, String username, String password)
    58            throws IOException, InterruptedException, SAXException, 
    59                ParserConfigurationException {
    60
    61        ContentExchange exchange = new ContentExchange();
    62        exchange.setMethod("POST");
    63        exchange.setURL(getSoapURL());
    64        exchange.setRequestContentSource(new ByteArrayInputStream(soapXmlForLogin(
    65            username, password)));
    66        exchange.setRequestHeader("Content-Type", "text/xml");
    67        exchange.setRequestHeader("SOAPAction", "''");
    68        exchange.setRequestHeader("PrettyPrint", "Yes");
    69
    70        client.send(exchange);
    71        exchange.waitForDone();
    72        String response = exchange.getResponseContent();
    73
    74        SAXParserFactory spf = SAXParserFactory.newInstance();
    75        spf.setNamespaceAware(true);
    76        SAXParser saxParser = spf.newSAXParser();
    77
    78        LoginResponseParser parser = new LoginResponseParser();
    79        saxParser.parse(new ByteArrayInputStream(
    80            response.getBytes("UTF-8")), parser);
    81
    82        if (parser.sessionId == null || parser.serverUrl == null) {
    83            System.out.println("Login Failed!\n" + response);
    84            return null;
    85        }
    86
    87        URL soapEndpoint = new URL(parser.serverUrl);
    88        StringBuilder endpoint = new StringBuilder()
    89            .append(soapEndpoint.getProtocol())
    90            .append("://")
    91            .append(soapEndpoint.getHost());
    92
    93        if (soapEndpoint.getPort() > 0) endpoint.append(":")
    94            .append(soapEndpoint.getPort());
    95        return new String[] {parser.sessionId, endpoint.toString()};
    96    }
    97
    98    private static String getSoapURL() throws MalformedURLException {
    99        return new URL(StreamingClientExample.LOGIN_ENDPOINT + 
    100            getSoapUri()).toExternalForm();
    101    }
    102
    103    private static String getSoapUri() {
    104        return SERVICES_SOAP_PARTNER_ENDPOINT;
    105    }
    106
    107    private static class LoginResponseParser extends DefaultHandler {
    108
    109        private boolean inSessionId;
    110        private String sessionId;
    111
    112        private boolean inServerUrl;
    113        private String serverUrl;
    114
    115        @Override
    116        public void characters(char[] ch, int start, int length) {
    117            if (inSessionId) sessionId = new String(ch, start, length);
    118            if (inServerUrl) serverUrl = new String(ch, start, length);
    119        }
    120
    121        @Override
    122        public void endElement(String uri, String localName, String qName) {
    123            if (localName != null) {
    124                if (localName.equals("sessionId")) {
    125                    inSessionId = false;
    126                }
    127                
    128                if (localName.equals("serverUrl")) {
    129                    inServerUrl = false;
    130                }
    131            }
    132        }
    133
    134        @Override
    135        public void startElement(String uri, String localName, 
    136            String qName, Attributes attributes) {
    137            if (localName != null) {
    138                if (localName.equals("sessionId")) {
    139                    inSessionId = true;
    140                }
    141
    142                if (localName.equals("serverUrl")) {
    143                    inServerUrl = true;
    144                }
    145            }
    146        }
    147    }
    148}
  5. When you run this client app and generate notifications using the REST resource, the output will look something like:
    1Running streaming client example....
    2Login successful!
    3Endpoint: https://www.salesforce.com
    4Sessionid=00DD0000000FSp9!AQIAQIVjGYijFhiAROTc455T6kEVeJGXuW5VCnp
    5    LANCMawS7.p5fXbjYlqCgx7They_zFjmP5n9HxvfUA6xGSGtC1Nb6P4S.
    6
    7Waiting for handshake
    8[CHANNEL:META_HANDSHAKE]: 
    9{
    10    "id":"1",
    11    "minimumVersion":"1.0",
    12    "supportedConnectionTypes":["long-polling"],
    13    "successful":true,
    14    "channel":"/meta/handshake",
    15    "clientId":"31t0cjzfbgnfqn1rggumba0k98u",
    16    "version":"1.0"
    17}
    18
    19[CHANNEL:META_CONNECT]: 
    20{
    21    "id":"2",
    22    "successful":true,
    23    "advice":{"interval":0,"reconnect":"retry","timeout":110000},
    24    "channel":"/meta/connect"}
    25    Subscribing for channel: /u/notifications/ExampleUserChannel
    26    Waiting for streamed data from your organization ...
    27[CHANNEL:META_SUBSCRIBE]: 
    28{
    29    "id":"4",
    30    "subscription":"/u/notifications/ExampleUserChannel",
    31    "successful":true,
    32    "channel":"/meta/subscribe"
    33}
    34
    35[CHANNEL:META_CONNECT]: 
    36{
    37    "id":"3",
    38    "successful":true,
    39    "channel":"/meta/connect"
    40}
    41
    42Received Message: 
    43{
    44    "data":
    45    {
    46      "event":
    47      {
    48        "createdDate":"2013-07-30T23:15:59.000+0000"
    49      },
    50      "payload":"Broadcast message to all subscribers"
    51    },
    52    "channel":"/u/notifications/ExampleUserChannel",
    53    "clientId":"8173z2cplh8q6m1rmud93zygnf8"
    54}
    55[CHANNEL:META_CONNECT]: 
    56{
    57    "id":"5",
    58    "successful":true,
    59    "channel":"/meta/connect"
    60}