Newer Version Available

This content describes an older version of this product. View Latest

Step 4: Add the Source Code

  1. Add the following code to a Java source file named StreamingClientExample.java. This code subscribes to the PushTopic channel and handles the streaming information.
    1swfobject.registerObject("clippy.codeblock-0", "9");package demo;
    2
    3import org.cometd.bayeux.Channel;
    4import org.cometd.bayeux.Message;
    5import org.cometd.bayeux.client.ClientSessionChannel;
    6import org.cometd.bayeux.client.ClientSessionChannel.MessageListener;
    7import org.cometd.client.BayeuxClient;
    8import org.cometd.client.transport.ClientTransport;
    9import org.cometd.client.transport.LongPollingTransport;
    10
    11import org.eclipse.jetty.client.ContentExchange;
    12import org.eclipse.jetty.client.HttpClient;
    13
    14import java.net.MalformedURLException;
    15import java.net.URL;
    16import java.util.HashMap;
    17import java.util.Map;
    18
    19/**
    20 * This example demonstrates how a streaming client works 
    21 * against the Salesforce Streaming API.
    22 **/
    23
    24public class StreamingClientExample {
    25
    26    // This URL is used only for logging in. The LoginResult 
    27    // returns a serverUrl which is then used for constructing
    28    // the streaming URL. The serverUrl points to the endpoint 
    29    // where your organization is hosted.
    30
    31    static final String LOGIN_ENDPOINT = "https://login.salesforce.com";
    32    private static final String USER_NAME = "change_this_to_your_testuser@yourcompany.com";
    33    private static final String PASSWORD = "change_this_to_your_testpassword"; 
    34    // NOTE: Putting passwords in code is not a good practice and not recommended.
    35
    36    // Set this to true only when using this client 
    37    // against the Summer'11 release (API version=22.0).
    38    private static final boolean VERSION_22 = false;  
    39    private static final boolean USE_COOKIES = VERSION_22;
    40
    41    // The channel to subscribe to. Same as the name of the PushTopic. 
    42    // Be sure to create this topic before running this sample.
    43    private static final String CHANNEL = VERSION_22 ? "/InvoiceStatementUpdates" : "/topic/InvoiceStatementUpdates";
    44    private static final String STREAMING_ENDPOINT_URI = VERSION_22 ? 
    45        "/cometd" : "/cometd/30.0";
    46
    47    // The long poll duration.
    48    private static final int CONNECTION_TIMEOUT = 20 * 1000;  // milliseconds
    49    private static final int READ_TIMEOUT = 120 * 1000; // milliseconds
    50
    51    public static void main(String[] args) throws Exception {
    52
    53    	System.out.println("Running streaming client example....");
    54
    55        final BayeuxClient client = makeClient();
    56        client.getChannel(Channel.META_HANDSHAKE).addListener
    57            (new ClientSessionChannel.MessageListener() {
    58
    59            public void onMessage(ClientSessionChannel channel, Message message) {
    60
    61            	System.out.println("[CHANNEL:META_HANDSHAKE]: " + message);
    62
    63                boolean success = message.isSuccessful();
    64                if (!success) {
    65                    String error = (String) message.get("error");
    66                    if (error != null) {
    67                        System.out.println("Error during HANDSHAKE: " + error);
    68                        System.out.println("Exiting...");
    69                        System.exit(1);
    70                    }
    71
    72                    Exception exception = (Exception) message.get("exception");
    73                    if (exception != null) {
    74                        System.out.println("Exception during HANDSHAKE: ");
    75                        exception.printStackTrace();
    76                        System.out.println("Exiting...");
    77                        System.exit(1);
    78
    79                    }
    80                }
    81            }
    82
    83        });
    84
    85        client.getChannel(Channel.META_CONNECT).addListener(
    86            new ClientSessionChannel.MessageListener() {
    87            public void onMessage(ClientSessionChannel channel, Message message) {
    88
    89                System.out.println("[CHANNEL:META_CONNECT]: " + message);
    90
    91                boolean success = message.isSuccessful();
    92                if (!success) {
    93                    String error = (String) message.get("error");
    94                    if (error != null) {
    95                        System.out.println("Error during CONNECT: " + error);
    96                        System.out.println("Exiting...");
    97                        System.exit(1);
    98                    }
    99                }
    100            }
    101
    102        });
    103
    104        client.getChannel(Channel.META_SUBSCRIBE).addListener(
    105            new ClientSessionChannel.MessageListener() {
    106
    107            public void onMessage(ClientSessionChannel channel, Message message) {
    108
    109            	System.out.println("[CHANNEL:META_SUBSCRIBE]: " + message);
    110                boolean success = message.isSuccessful();
    111                if (!success) {
    112                    String error = (String) message.get("error");
    113                    if (error != null) {
    114                        System.out.println("Error during SUBSCRIBE: " + error);
    115                        System.out.println("Exiting...");
    116                        System.exit(1);
    117                    }
    118                }
    119            }
    120        });
    121
    122
    123
    124        client.handshake();
    125        System.out.println("Waiting for handshake");
    126        
    127        boolean handshaken = client.waitFor(10 * 1000, BayeuxClient.State.CONNECTED);
    128        if (!handshaken) {
    129            System.out.println("Failed to handshake: " + client);
    130            System.exit(1);
    131        }
    132
    133
    134
    135        System.out.println("Subscribing for channel: " + CHANNEL);
    136
    137        client.getChannel(CHANNEL).subscribe(new MessageListener() {
    138            @Override
    139            public void onMessage(ClientSessionChannel channel, Message message) {
    140                System.out.println("Received Message: " + message);
    141            }
    142        });
    143
    144        System.out.println("Waiting for streamed data from your organization ...");
    145        while (true) {
    146            // This infinite loop is for demo only,
    147            // to receive streamed events on the 
    148            // specified topic from your organization.
    149        }
    150    }
    151
    152
    153
    154    private static BayeuxClient makeClient() throws Exception {
    155        HttpClient httpClient = new HttpClient();
    156        httpClient.setConnectTimeout(CONNECTION_TIMEOUT);
    157        httpClient.setTimeout(READ_TIMEOUT);
    158        httpClient.start();
    159
    160        String[] pair = SoapLoginUtil.login(httpClient, USER_NAME, PASSWORD);
    161        
    162        if (pair == null) {
    163            System.exit(1);
    164        }
    165
    166        assert pair.length == 2;
    167        final String sessionid = pair[0];
    168        String endpoint = pair[1];
    169        System.out.println("Login successful!\nEndpoint: " + endpoint 
    170            + "\nSessionid=" + sessionid);
    171
    172        Map<String, Object> options = new HashMap<String, Object>();
    173        options.put(ClientTransport.TIMEOUT_OPTION, READ_TIMEOUT);
    174        LongPollingTransport transport = new LongPollingTransport(
    175          options, httpClient) {
    176
    177        	@Override
    178            protected void customize(ContentExchange exchange) {
    179                super.customize(exchange);
    180                exchange.addRequestHeader("Authorization", "OAuth " + sessionid);
    181            }
    182        };
    183
    184        BayeuxClient client = new BayeuxClient(salesforceStreamingEndpoint(
    185            endpoint), transport);
    186        if (USE_COOKIES) establishCookies(client, USER_NAME, sessionid);
    187        return client;
    188    }
    189
    190    private static String salesforceStreamingEndpoint(String endpoint)
    191        throws MalformedURLException {
    192        return new URL(endpoint + STREAMING_ENDPOINT_URI).toExternalForm();
    193    }
    194
    195    private static void establishCookies(BayeuxClient client, String user, 
    196        String sid) {
    197        client.setCookie("com.salesforce.LocaleInfo", "us", 24 * 60 * 60 * 1000);
    198        client.setCookie("login", user, 24 * 60 * 60 * 1000);
    199        client.setCookie("sid", sid, 24 * 60 * 60 * 1000);
    200        client.setCookie("language", "en_US", 24 * 60 * 60 * 1000);
    201    }
    202}
    203
    204
  2. Edit StreamingClientExample.java and modify the following values:
    File Name Static Resource Name
    USER_NAME Username of the logged-in user
    PASSWORD Password for the USER_NAME (or logged-in user)
    CHANNEL /topic/InvoiceStatementUpdates
    LOGIN_ENDPOINT https://test.salesforce.com (Only if you are using a sandbox. If you are in a production organization, no change is required for LOGIN_ENDPOINT.)
  3. Add the following code to a Java source file named SoapLoginUtil.java. This code sends a username and password to the server and receives the session ID.

    Never handle the usernames and passwords of others. Before using in a production environment, delegate the login to OAuth.

    Important

    1swfobject.registerObject("clippy.codeblock-1", "9");package demo;
    2
    3import java.io.ByteArrayInputStream;
    4import java.io.IOException;
    5import java.io.UnsupportedEncodingException;
    6import java.net.MalformedURLException;
    7import java.net.URL;
    8
    9import org.eclipse.jetty.client.ContentExchange;
    10import org.eclipse.jetty.client.HttpClient;
    11import org.xml.sax.Attributes;
    12import org.xml.sax.SAXException;
    13import org.xml.sax.helpers.DefaultHandler;
    14
    15import javax.xml.parsers.ParserConfigurationException;
    16import javax.xml.parsers.SAXParser;
    17import javax.xml.parsers.SAXParserFactory;
    18
    19public final class SoapLoginUtil {
    20
    21    // The enterprise SOAP API endpoint used for the login call in this example.
    22    private static final String SERVICES_SOAP_PARTNER_ENDPOINT = "/services/Soap/u/22.0/";
    23
    24    private static final String ENV_START =
    25            "<soapenv:Envelope xmlns:soapenv='http://schemas.xmlsoap.org/soap/envelope/' "
    26                    + "xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' " +
    27                    "xmlns:urn='urn:partner.soap.sforce.com'><soapenv:Body>";
    28
    29    private static final String ENV_END = "</soapenv:Body></soapenv:Envelope>";
    30
    31    private static byte[] soapXmlForLogin(String username, String password) 
    32        throws UnsupportedEncodingException {
    33        return (ENV_START +
    34                "  <urn:login>" +
    35                "    <urn:username>" + username + "</urn:username>" +
    36                "    <urn:password>" + password + "</urn:password>" +
    37                "  </urn:login>" +
    38                ENV_END).getBytes("UTF-8");
    39    }
    40
    41    public static String[] login(HttpClient client, String username, String password)
    42            throws IOException, InterruptedException, SAXException, 
    43                ParserConfigurationException {
    44
    45        ContentExchange exchange = new ContentExchange();
    46        exchange.setMethod("POST");
    47        exchange.setURL(getSoapURL());
    48        exchange.setRequestContentSource(new ByteArrayInputStream(soapXmlForLogin(
    49            username, password)));
    50        exchange.setRequestHeader("Content-Type", "text/xml");
    51        exchange.setRequestHeader("SOAPAction", "''");
    52        exchange.setRequestHeader("PrettyPrint", "Yes");
    53
    54        client.send(exchange);
    55        exchange.waitForDone();
    56        String response = exchange.getResponseContent();
    57
    58        SAXParserFactory spf = SAXParserFactory.newInstance();
    59        spf.setNamespaceAware(true);
    60        SAXParser saxParser = spf.newSAXParser();
    61
    62        LoginResponseParser parser = new LoginResponseParser();
    63        saxParser.parse(new ByteArrayInputStream(
    64            response.getBytes("UTF-8")), parser);
    65
    66        if (parser.sessionId == null || parser.serverUrl == null) {
    67            System.out.println("Login Failed!\n" + response);
    68            return null;
    69        }
    70
    71        URL soapEndpoint = new URL(parser.serverUrl);
    72        StringBuilder endpoint = new StringBuilder()
    73            .append(soapEndpoint.getProtocol())
    74            .append("://")
    75            .append(soapEndpoint.getHost());
    76
    77        if (soapEndpoint.getPort() > 0) endpoint.append(":")
    78            .append(soapEndpoint.getPort());
    79        return new String[] {parser.sessionId, endpoint.toString()};
    80    }
    81
    82    private static String getSoapURL() throws MalformedURLException {
    83        return new URL(StreamingClientExample.LOGIN_ENDPOINT + 
    84            getSoapUri()).toExternalForm();
    85    }
    86
    87    private static String getSoapUri() {
    88        return SERVICES_SOAP_PARTNER_ENDPOINT;
    89    }
    90
    91    private static class LoginResponseParser extends DefaultHandler {
    92
    93        private boolean inSessionId;
    94        private String sessionId;
    95
    96        private boolean inServerUrl;
    97        private String serverUrl;
    98
    99        @Override
    100        public void characters(char[] ch, int start, int length) {
    101            if (inSessionId) sessionId = new String(ch, start, length);
    102            if (inServerUrl) serverUrl = new String(ch, start, length);
    103        }
    104
    105        @Override
    106        public void endElement(String uri, String localName, String qName) {
    107            if (localName != null) {
    108                if (localName.equals("sessionId")) {
    109                    inSessionId = false;
    110                }
    111                
    112                if (localName.equals("serverUrl")) {
    113                    inServerUrl = false;
    114                }
    115            }
    116        }
    117
    118        @Override
    119        public void startElement(String uri, String localName, 
    120            String qName, Attributes attributes) {
    121            if (localName != null) {
    122                if (localName.equals("sessionId")) {
    123                    inSessionId = true;
    124                }
    125
    126                if (localName.equals("serverUrl")) {
    127                    inServerUrl = true;
    128                }
    129            }
    130        }
    131    }
    132}
  4. In a different browser window, create or modify an InvoiceStatement. After you create or change data that corresponds to the query in your PushTopic, the output looks something like this:
    1Running streaming client example....
    2Login successful!
    3Endpoint: https://www.salesforce.com
    4Sessionid=00DD0000000FSp9!AQIAQIVjGYijFhiAROTc455T6kEVeJGXuW5VCnp
    5    LANCMawS7.p5fXbjYlqCgx7They_zFjmP5n9HxvfUA6xGSGtC1Nb6P4S.
    6
    7Waiting for handshake
    8[CHANNEL:META_HANDSHAKE]: 
    9{
    10    "id":"1",
    11    "minimumVersion":"1.0",
    12    "supportedConnectionTypes":["long-polling"],
    13    "successful":true,
    14    "channel":"/meta/handshake",
    15    "clientId":"31t0cjzfbgnfqn1rggumba0k98u",
    16    "version":"1.0"
    17}
    18
    19[CHANNEL:META_CONNECT]: 
    20{
    21    "id":"2",
    22    "successful":true,
    23    "advice":{"interval":0,"reconnect":"retry","timeout":110000},
    24    "channel":"/meta/connect"}
    25    Subscribing for channel: /topic/InvoiceStatementUpdates
    26    Waiting for streamed data from your organization ...
    27[CHANNEL:META_SUBSCRIBE]: 
    28{
    29    "id":"4",
    30    "subscription":"/topic/InvoiceStatementUpdates",
    31    "successful":true,
    32    "channel":"/meta/subscribe"
    33}
    34
    35[CHANNEL:META_CONNECT]: 
    36{
    37    "id":"3",
    38    "successful":true,
    39    "channel":"/meta/connect"
    40}
    41
    42Received Message: 
    43{
    44"data":
    45    {
    46      "sobject":
    47      {
    48        "Name":"INV-0002",
    49        "Id":"001D000000J3fTHIAZ",
    50        "Status__c":"Pending"},
    51        "event":{"type":"updated",
    52        "createdDate":"2011-09-06T18:51:08.000+0000"
    53      }
    54    },
    55    "channel":"/topic/InvoiceStatementUpdates"
    56}
    57
    58[CHANNEL:META_CONNECT]: 
    59{
    60    "id":"5",
    61    "successful":true,
    62    "channel":"/meta/connect"
    63}