Newer Version Available

This content describes an older version of this product. View Latest

Create a Java Client

Create a Java client that uses Bayeux and CometD to subscribe to the channel.
  1. Download and install the CometD and Jetty .jar files if necessary.
  2. In a new Java project, add the following code to a Java source file named StreamingClientExample.java. This code subscribes to the Streaming channel you created and listens for notifications. Depending on your Java development environment, you might have to rename this file and class to Main.
    1swfobject.registerObject("clippy.codeblock-0", "9");package demo;
    2
    3import org.cometd.bayeux.Channel;
    4import org.cometd.bayeux.Message;
    5import org.cometd.bayeux.client.ClientSessionChannel;
    6import org.cometd.bayeux.client.ClientSessionChannel.MessageListener;
    7import org.cometd.client.BayeuxClient;
    8import org.cometd.client.transport.ClientTransport;
    9import org.cometd.client.transport.LongPollingTransport;
    10
    11import org.eclipse.jetty.client.ContentExchange;
    12import org.eclipse.jetty.client.HttpClient;
    13
    14import java.net.MalformedURLException;
    15import java.net.URL;
    16import java.util.HashMap;
    17import java.util.Map;
    18
    19/**
    20 * This example demonstrates how a streaming client works 
    21 * against the Salesforce Streaming API with generic notifications.
    22 **/
    23
    24public class StreamingClientExample {
    25
    26    // This URL is used only for logging in. The LoginResult 
    27    // returns a serverUrl which is then used for constructing
    28    // the streaming URL. The serverUrl points to the endpoint 
    29    // where your organization is hosted.
    30
    31    static final String LOGIN_ENDPOINT = "https://login.salesforce.com";
    32    private static final String USER_NAME = "change_this_to_your_testuser@yourcompany.com";
    33    private static final String PASSWORD = "change_this_to_your_testpassword"; 
    34    // NOTE: Putting passwords in code is not a good practice and not recommended.
    35
    36    // The channel to subscribe to. 
    37    // Be sure to create the StreamingChannel before running this sample.
    38    private static final String CHANNEL = "/u/notifications/ExampleUserChannel";
    39    private static final String STREAMING_ENDPOINT_URI = "/cometd/30.0";
    40
    41    // The long poll duration.
    42    private static final int CONNECTION_TIMEOUT = 20 * 1000;  // milliseconds
    43    private static final int READ_TIMEOUT = 120 * 1000; // milliseconds
    44
    45    public static void main(String[] args) throws Exception {
    46
    47    	System.out.println("Running streaming client example....");
    48
    49        final BayeuxClient client = makeClient();
    50        client.getChannel(Channel.META_HANDSHAKE).addListener
    51            (new ClientSessionChannel.MessageListener() {
    52
    53            public void onMessage(ClientSessionChannel channel, Message message) {
    54
    55            	System.out.println("[CHANNEL:META_HANDSHAKE]: " + message);
    56
    57                boolean success = message.isSuccessful();
    58                if (!success) {
    59                    String error = (String) message.get("error");
    60                    if (error != null) {
    61                        System.out.println("Error during HANDSHAKE: " + error);
    62                        System.out.println("Exiting...");
    63                        System.exit(1);
    64                    }
    65
    66                    Exception exception = (Exception) message.get("exception");
    67                    if (exception != null) {
    68                        System.out.println("Exception during HANDSHAKE: ");
    69                        exception.printStackTrace();
    70                        System.out.println("Exiting...");
    71                        System.exit(1);
    72
    73                    }
    74                }
    75            }
    76
    77        });
    78
    79        client.getChannel(Channel.META_CONNECT).addListener(
    80            new ClientSessionChannel.MessageListener() {
    81            public void onMessage(ClientSessionChannel channel, Message message) {
    82
    83                System.out.println("[CHANNEL:META_CONNECT]: " + message);
    84
    85                boolean success = message.isSuccessful();
    86                if (!success) {
    87                    String error = (String) message.get("error");
    88                    if (error != null) {
    89                        System.out.println("Error during CONNECT: " + error);
    90                        System.out.println("Exiting...");
    91                        System.exit(1);
    92                    }
    93                }
    94            }
    95
    96        });
    97
    98        client.getChannel(Channel.META_SUBSCRIBE).addListener(
    99            new ClientSessionChannel.MessageListener() {
    100
    101            public void onMessage(ClientSessionChannel channel, Message message) {
    102
    103            	System.out.println("[CHANNEL:META_SUBSCRIBE]: " + message);
    104                boolean success = message.isSuccessful();
    105                if (!success) {
    106                    String error = (String) message.get("error");
    107                    if (error != null) {
    108                        System.out.println("Error during SUBSCRIBE: " + error);
    109                        System.out.println("Exiting...");
    110                        System.exit(1);
    111                    }
    112                }
    113            }
    114        });
    115
    116
    117
    118        client.handshake();
    119        System.out.println("Waiting for handshake");
    120        
    121        boolean handshaken = client.waitFor(10 * 1000, BayeuxClient.State.CONNECTED);
    122        if (!handshaken) {
    123            System.out.println("Failed to handshake: " + client);
    124            System.exit(1);
    125        }
    126
    127
    128
    129        System.out.println("Subscribing for channel: " + CHANNEL);
    130
    131        client.getChannel(CHANNEL).subscribe(new MessageListener() {
    132            @Override
    133            public void onMessage(ClientSessionChannel channel, Message message) {
    134                System.out.println("Received Message: " + message);
    135            }
    136        });
    137
    138        System.out.println("Waiting for streamed data from your organization ...");
    139        while (true) {
    140            // This infinite loop is for demo only,
    141            // to receive streamed events on the 
    142            // specified topic from your organization.
    143        }
    144    }
    145
    146
    147
    148    private static BayeuxClient makeClient() throws Exception {
    149        HttpClient httpClient = new HttpClient();
    150        httpClient.setConnectTimeout(CONNECTION_TIMEOUT);
    151        httpClient.setTimeout(READ_TIMEOUT);
    152        httpClient.start();
    153
    154        String[] pair = SoapLoginUtil.login(httpClient, USER_NAME, PASSWORD);
    155        
    156        if (pair == null) {
    157            System.exit(1);
    158        }
    159
    160        assert pair.length == 2;
    161        final String sessionid = pair[0];
    162        String endpoint = pair[1];
    163        System.out.println("Login successful!\nEndpoint: " + endpoint 
    164            + "\nSessionid=" + sessionid);
    165
    166        Map<String, Object> options = new HashMap<String, Object>();
    167        options.put(ClientTransport.TIMEOUT_OPTION, READ_TIMEOUT);
    168        LongPollingTransport transport = new LongPollingTransport(
    169          options, httpClient) {
    170
    171        	@Override
    172            protected void customize(ContentExchange exchange) {
    173                super.customize(exchange);
    174                exchange.addRequestHeader("Authorization", "OAuth " + sessionid);
    175            }
    176        };
    177
    178        BayeuxClient client = new BayeuxClient(salesforceStreamingEndpoint(
    179            endpoint), transport);
    180        return client;
    181    }
    182
    183    private static String salesforceStreamingEndpoint(String endpoint)
    184        throws MalformedURLException {
    185        return new URL(endpoint + STREAMING_ENDPOINT_URI).toExternalForm();
    186    }
    187
    188}
    189
    190
  3. Edit StreamingClientExample.java and modify the following values:
    File Name Static Resource Name
    USER_NAME Username of the logged-in user
    PASSWORD Password for the USER_NAME (or logged-in user)
    CHANNEL /u/notifications/ExampleUserChannel
    LOGIN_ENDPOINT https://test.salesforce.com (Only if you are using a sandbox. If you are in a production organization, no change is required for LOGIN_ENDPOINT.)
  4. Add the following code to a Java source file named SoapLoginUtil.java. This code sends a username and password to the server and receives the session ID.

    Never handle the usernames and passwords of others. Before using in a production environment, delegate the login to OAuth.

    Important

    1swfobject.registerObject("clippy.codeblock-1", "9");package demo;
    2
    3import java.io.ByteArrayInputStream;
    4import java.io.IOException;
    5import java.io.UnsupportedEncodingException;
    6import java.net.MalformedURLException;
    7import java.net.URL;
    8
    9import org.eclipse.jetty.client.ContentExchange;
    10import org.eclipse.jetty.client.HttpClient;
    11import org.xml.sax.Attributes;
    12import org.xml.sax.SAXException;
    13import org.xml.sax.helpers.DefaultHandler;
    14
    15import javax.xml.parsers.ParserConfigurationException;
    16import javax.xml.parsers.SAXParser;
    17import javax.xml.parsers.SAXParserFactory;
    18
    19public final class SoapLoginUtil {
    20
    21    // The enterprise SOAP API endpoint used for the login call in this example.
    22    private static final String SERVICES_SOAP_PARTNER_ENDPOINT = "/services/Soap/u/22.0/";
    23
    24    private static final String ENV_START =
    25            "<soapenv:Envelope xmlns:soapenv='http://schemas.xmlsoap.org/soap/envelope/' "
    26                    + "xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' " +
    27                    "xmlns:urn='urn:partner.soap.sforce.com'><soapenv:Body>";
    28
    29    private static final String ENV_END = "</soapenv:Body></soapenv:Envelope>";
    30
    31    private static byte[] soapXmlForLogin(String username, String password) 
    32        throws UnsupportedEncodingException {
    33        return (ENV_START +
    34                "  <urn:login>" +
    35                "    <urn:username>" + username + "</urn:username>" +
    36                "    <urn:password>" + password + "</urn:password>" +
    37                "  </urn:login>" +
    38                ENV_END).getBytes("UTF-8");
    39    }
    40
    41    public static String[] login(HttpClient client, String username, String password)
    42            throws IOException, InterruptedException, SAXException, 
    43                ParserConfigurationException {
    44
    45        ContentExchange exchange = new ContentExchange();
    46        exchange.setMethod("POST");
    47        exchange.setURL(getSoapURL());
    48        exchange.setRequestContentSource(new ByteArrayInputStream(soapXmlForLogin(
    49            username, password)));
    50        exchange.setRequestHeader("Content-Type", "text/xml");
    51        exchange.setRequestHeader("SOAPAction", "''");
    52        exchange.setRequestHeader("PrettyPrint", "Yes");
    53
    54        client.send(exchange);
    55        exchange.waitForDone();
    56        String response = exchange.getResponseContent();
    57
    58        SAXParserFactory spf = SAXParserFactory.newInstance();
    59        spf.setNamespaceAware(true);
    60        SAXParser saxParser = spf.newSAXParser();
    61
    62        LoginResponseParser parser = new LoginResponseParser();
    63        saxParser.parse(new ByteArrayInputStream(
    64            response.getBytes("UTF-8")), parser);
    65
    66        if (parser.sessionId == null || parser.serverUrl == null) {
    67            System.out.println("Login Failed!\n" + response);
    68            return null;
    69        }
    70
    71        URL soapEndpoint = new URL(parser.serverUrl);
    72        StringBuilder endpoint = new StringBuilder()
    73            .append(soapEndpoint.getProtocol())
    74            .append("://")
    75            .append(soapEndpoint.getHost());
    76
    77        if (soapEndpoint.getPort() > 0) endpoint.append(":")
    78            .append(soapEndpoint.getPort());
    79        return new String[] {parser.sessionId, endpoint.toString()};
    80    }
    81
    82    private static String getSoapURL() throws MalformedURLException {
    83        return new URL(StreamingClientExample.LOGIN_ENDPOINT + 
    84            getSoapUri()).toExternalForm();
    85    }
    86
    87    private static String getSoapUri() {
    88        return SERVICES_SOAP_PARTNER_ENDPOINT;
    89    }
    90
    91    private static class LoginResponseParser extends DefaultHandler {
    92
    93        private boolean inSessionId;
    94        private String sessionId;
    95
    96        private boolean inServerUrl;
    97        private String serverUrl;
    98
    99        @Override
    100        public void characters(char[] ch, int start, int length) {
    101            if (inSessionId) sessionId = new String(ch, start, length);
    102            if (inServerUrl) serverUrl = new String(ch, start, length);
    103        }
    104
    105        @Override
    106        public void endElement(String uri, String localName, String qName) {
    107            if (localName != null) {
    108                if (localName.equals("sessionId")) {
    109                    inSessionId = false;
    110                }
    111                
    112                if (localName.equals("serverUrl")) {
    113                    inServerUrl = false;
    114                }
    115            }
    116        }
    117
    118        @Override
    119        public void startElement(String uri, String localName, 
    120            String qName, Attributes attributes) {
    121            if (localName != null) {
    122                if (localName.equals("sessionId")) {
    123                    inSessionId = true;
    124                }
    125
    126                if (localName.equals("serverUrl")) {
    127                    inServerUrl = true;
    128                }
    129            }
    130        }
    131    }
    132}
  5. When you run this client app and generate notifications using the REST resource, the output will look something like:
    1Running streaming client example....
    2Login successful!
    3Endpoint: https://www.salesforce.com
    4Sessionid=00DD0000000FSp9!AQIAQIVjGYijFhiAROTc455T6kEVeJGXuW5VCnp
    5    LANCMawS7.p5fXbjYlqCgx7They_zFjmP5n9HxvfUA6xGSGtC1Nb6P4S.
    6
    7Waiting for handshake
    8[CHANNEL:META_HANDSHAKE]: 
    9{
    10    "id":"1",
    11    "minimumVersion":"1.0",
    12    "supportedConnectionTypes":["long-polling"],
    13    "successful":true,
    14    "channel":"/meta/handshake",
    15    "clientId":"31t0cjzfbgnfqn1rggumba0k98u",
    16    "version":"1.0"
    17}
    18
    19[CHANNEL:META_CONNECT]: 
    20{
    21    "id":"2",
    22    "successful":true,
    23    "advice":{"interval":0,"reconnect":"retry","timeout":110000},
    24    "channel":"/meta/connect"}
    25    Subscribing for channel: /u/notifications/ExampleUserChannel
    26    Waiting for streamed data from your organization ...
    27[CHANNEL:META_SUBSCRIBE]: 
    28{
    29    "id":"4",
    30    "subscription":"/u/notifications/ExampleUserChannel",
    31    "successful":true,
    32    "channel":"/meta/subscribe"
    33}
    34
    35[CHANNEL:META_CONNECT]: 
    36{
    37    "id":"3",
    38    "successful":true,
    39    "channel":"/meta/connect"
    40}
    41
    42Received Message: 
    43{
    44    "data":
    45    {
    46      "event":
    47      {
    48        "createdDate":"2013-07-30T23:15:59.000+0000"
    49      },
    50      "payload":"Broadcast message to all subscribers"
    51    },
    52    "channel":"/u/notifications/ExampleUserChannel",
    53    "clientId":"8173z2cplh8q6m1rmud93zygnf8"
    54}
    55[CHANNEL:META_CONNECT]: 
    56{
    57    "id":"5",
    58    "successful":true,
    59    "channel":"/meta/connect"
    60}