No Results
Search Tips:
- Please consider misspellings
- Try different search keywords
Newer Version Available
Step 4: Add the Source Code
- Add the following code to a Java source file named StreamingClientExample.java. This code subscribes to the
PushTopic channel and handles the streaming information.
1swfobject.registerObject("clippy.codeblock-0", "9");package demo; 2 3import org.cometd.bayeux.Channel; 4import org.cometd.bayeux.Message; 5import org.cometd.bayeux.client.ClientSessionChannel; 6import org.cometd.bayeux.client.ClientSessionChannel.MessageListener; 7import org.cometd.client.BayeuxClient; 8import org.cometd.client.transport.ClientTransport; 9import org.cometd.client.transport.LongPollingTransport; 10 11import org.eclipse.jetty.client.ContentExchange; 12import org.eclipse.jetty.client.HttpClient; 13 14import java.net.MalformedURLException; 15import java.net.URL; 16import java.util.HashMap; 17import java.util.Map; 18 19/** 20 * This example demonstrates how a streaming client works 21 * against the Salesforce Streaming API. 22 **/ 23 24public class StreamingClientExample { 25 26 // This URL is used only for logging in. The LoginResult 27 // returns a serverUrl which is then used for constructing 28 // the streaming URL. The serverUrl points to the endpoint 29 // where your organization is hosted. 30 31 static final String LOGIN_ENDPOINT = "https://login.salesforce.com"; 32 private static final String USER_NAME = "change_this_to_your_testuser@yourcompany.com"; 33 private static final String PASSWORD = "change_this_to_your_testpassword"; 34 // NOTE: Putting passwords in code is not a good practice and not recommended. 35 36 // Set this to true only when using this client 37 // against the Summer'11 release (API version=22.0). 38 private static final boolean VERSION_22 = false; 39 private static final boolean USE_COOKIES = VERSION_22; 40 41 // The channel to subscribe to. Same as the name of the PushTopic. 42 // Be sure to create this topic before running this sample. 43 private static final String CHANNEL = VERSION_22 ? "/InvoiceStatementUpdates" : "/topic/InvoiceStatementUpdates"; 44 private static final String STREAMING_ENDPOINT_URI = VERSION_22 ? 45 "/cometd" : "/cometd/30.0"; 46 47 // The long poll duration. 48 private static final int CONNECTION_TIMEOUT = 20 * 1000; // milliseconds 49 private static final int READ_TIMEOUT = 120 * 1000; // milliseconds 50 51 public static void main(String[] args) throws Exception { 52 53 System.out.println("Running streaming client example...."); 54 55 final BayeuxClient client = makeClient(); 56 client.getChannel(Channel.META_HANDSHAKE).addListener 57 (new ClientSessionChannel.MessageListener() { 58 59 public void onMessage(ClientSessionChannel channel, Message message) { 60 61 System.out.println("[CHANNEL:META_HANDSHAKE]: " + message); 62 63 boolean success = message.isSuccessful(); 64 if (!success) { 65 String error = (String) message.get("error"); 66 if (error != null) { 67 System.out.println("Error during HANDSHAKE: " + error); 68 System.out.println("Exiting..."); 69 System.exit(1); 70 } 71 72 Exception exception = (Exception) message.get("exception"); 73 if (exception != null) { 74 System.out.println("Exception during HANDSHAKE: "); 75 exception.printStackTrace(); 76 System.out.println("Exiting..."); 77 System.exit(1); 78 79 } 80 } 81 } 82 83 }); 84 85 client.getChannel(Channel.META_CONNECT).addListener( 86 new ClientSessionChannel.MessageListener() { 87 public void onMessage(ClientSessionChannel channel, Message message) { 88 89 System.out.println("[CHANNEL:META_CONNECT]: " + message); 90 91 boolean success = message.isSuccessful(); 92 if (!success) { 93 String error = (String) message.get("error"); 94 if (error != null) { 95 System.out.println("Error during CONNECT: " + error); 96 System.out.println("Exiting..."); 97 System.exit(1); 98 } 99 } 100 } 101 102 }); 103 104 client.getChannel(Channel.META_SUBSCRIBE).addListener( 105 new ClientSessionChannel.MessageListener() { 106 107 public void onMessage(ClientSessionChannel channel, Message message) { 108 109 System.out.println("[CHANNEL:META_SUBSCRIBE]: " + message); 110 boolean success = message.isSuccessful(); 111 if (!success) { 112 String error = (String) message.get("error"); 113 if (error != null) { 114 System.out.println("Error during SUBSCRIBE: " + error); 115 System.out.println("Exiting..."); 116 System.exit(1); 117 } 118 } 119 } 120 }); 121 122 123 124 client.handshake(); 125 System.out.println("Waiting for handshake"); 126 127 boolean handshaken = client.waitFor(10 * 1000, BayeuxClient.State.CONNECTED); 128 if (!handshaken) { 129 System.out.println("Failed to handshake: " + client); 130 System.exit(1); 131 } 132 133 134 135 System.out.println("Subscribing for channel: " + CHANNEL); 136 137 client.getChannel(CHANNEL).subscribe(new MessageListener() { 138 @Override 139 public void onMessage(ClientSessionChannel channel, Message message) { 140 System.out.println("Received Message: " + message); 141 } 142 }); 143 144 System.out.println("Waiting for streamed data from your organization ..."); 145 while (true) { 146 // This infinite loop is for demo only, 147 // to receive streamed events on the 148 // specified topic from your organization. 149 } 150 } 151 152 153 154 private static BayeuxClient makeClient() throws Exception { 155 HttpClient httpClient = new HttpClient(); 156 httpClient.setConnectTimeout(CONNECTION_TIMEOUT); 157 httpClient.setTimeout(READ_TIMEOUT); 158 httpClient.start(); 159 160 String[] pair = SoapLoginUtil.login(httpClient, USER_NAME, PASSWORD); 161 162 if (pair == null) { 163 System.exit(1); 164 } 165 166 assert pair.length == 2; 167 final String sessionid = pair[0]; 168 String endpoint = pair[1]; 169 System.out.println("Login successful!\nEndpoint: " + endpoint 170 + "\nSessionid=" + sessionid); 171 172 Map<String, Object> options = new HashMap<String, Object>(); 173 options.put(ClientTransport.TIMEOUT_OPTION, READ_TIMEOUT); 174 LongPollingTransport transport = new LongPollingTransport( 175 options, httpClient) { 176 177 @Override 178 protected void customize(ContentExchange exchange) { 179 super.customize(exchange); 180 exchange.addRequestHeader("Authorization", "OAuth " + sessionid); 181 } 182 }; 183 184 BayeuxClient client = new BayeuxClient(salesforceStreamingEndpoint( 185 endpoint), transport); 186 if (USE_COOKIES) establishCookies(client, USER_NAME, sessionid); 187 return client; 188 } 189 190 private static String salesforceStreamingEndpoint(String endpoint) 191 throws MalformedURLException { 192 return new URL(endpoint + STREAMING_ENDPOINT_URI).toExternalForm(); 193 } 194 195 private static void establishCookies(BayeuxClient client, String user, 196 String sid) { 197 client.setCookie("com.salesforce.LocaleInfo", "us", 24 * 60 * 60 * 1000); 198 client.setCookie("login", user, 24 * 60 * 60 * 1000); 199 client.setCookie("sid", sid, 24 * 60 * 60 * 1000); 200 client.setCookie("language", "en_US", 24 * 60 * 60 * 1000); 201 } 202} 203 204 - Edit StreamingClientExample.java and
modify the following values:
File Name Static Resource Name USER_NAME Username of the logged-in user PASSWORD Password for the USER_NAME (or logged-in user) CHANNEL /topic/InvoiceStatementUpdates LOGIN_ENDPOINT https://test.salesforce.com (Only if you are using a sandbox. If you are in a production organization, no change is required for LOGIN_ENDPOINT.) - Add the following code to a Java source file named SoapLoginUtil.java. This code sends a username and password
to the server and receives the session ID.
1swfobject.registerObject("clippy.codeblock-1", "9");package demo; 2 3import java.io.ByteArrayInputStream; 4import java.io.IOException; 5import java.io.UnsupportedEncodingException; 6import java.net.MalformedURLException; 7import java.net.URL; 8 9import org.eclipse.jetty.client.ContentExchange; 10import org.eclipse.jetty.client.HttpClient; 11import org.xml.sax.Attributes; 12import org.xml.sax.SAXException; 13import org.xml.sax.helpers.DefaultHandler; 14 15import javax.xml.parsers.ParserConfigurationException; 16import javax.xml.parsers.SAXParser; 17import javax.xml.parsers.SAXParserFactory; 18 19public final class SoapLoginUtil { 20 21 // The enterprise SOAP API endpoint used for the login call in this example. 22 private static final String SERVICES_SOAP_PARTNER_ENDPOINT = "/services/Soap/u/22.0/"; 23 24 private static final String ENV_START = 25 "<soapenv:Envelope xmlns:soapenv='http://schemas.xmlsoap.org/soap/envelope/' " 26 + "xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' " + 27 "xmlns:urn='urn:partner.soap.sforce.com'><soapenv:Body>"; 28 29 private static final String ENV_END = "</soapenv:Body></soapenv:Envelope>"; 30 31 private static byte[] soapXmlForLogin(String username, String password) 32 throws UnsupportedEncodingException { 33 return (ENV_START + 34 " <urn:login>" + 35 " <urn:username>" + username + "</urn:username>" + 36 " <urn:password>" + password + "</urn:password>" + 37 " </urn:login>" + 38 ENV_END).getBytes("UTF-8"); 39 } 40 41 public static String[] login(HttpClient client, String username, String password) 42 throws IOException, InterruptedException, SAXException, 43 ParserConfigurationException { 44 45 ContentExchange exchange = new ContentExchange(); 46 exchange.setMethod("POST"); 47 exchange.setURL(getSoapURL()); 48 exchange.setRequestContentSource(new ByteArrayInputStream(soapXmlForLogin( 49 username, password))); 50 exchange.setRequestHeader("Content-Type", "text/xml"); 51 exchange.setRequestHeader("SOAPAction", "''"); 52 exchange.setRequestHeader("PrettyPrint", "Yes"); 53 54 client.send(exchange); 55 exchange.waitForDone(); 56 String response = exchange.getResponseContent(); 57 58 SAXParserFactory spf = SAXParserFactory.newInstance(); 59 spf.setNamespaceAware(true); 60 SAXParser saxParser = spf.newSAXParser(); 61 62 LoginResponseParser parser = new LoginResponseParser(); 63 saxParser.parse(new ByteArrayInputStream( 64 response.getBytes("UTF-8")), parser); 65 66 if (parser.sessionId == null || parser.serverUrl == null) { 67 System.out.println("Login Failed!\n" + response); 68 return null; 69 } 70 71 URL soapEndpoint = new URL(parser.serverUrl); 72 StringBuilder endpoint = new StringBuilder() 73 .append(soapEndpoint.getProtocol()) 74 .append("://") 75 .append(soapEndpoint.getHost()); 76 77 if (soapEndpoint.getPort() > 0) endpoint.append(":") 78 .append(soapEndpoint.getPort()); 79 return new String[] {parser.sessionId, endpoint.toString()}; 80 } 81 82 private static String getSoapURL() throws MalformedURLException { 83 return new URL(StreamingClientExample.LOGIN_ENDPOINT + 84 getSoapUri()).toExternalForm(); 85 } 86 87 private static String getSoapUri() { 88 return SERVICES_SOAP_PARTNER_ENDPOINT; 89 } 90 91 private static class LoginResponseParser extends DefaultHandler { 92 93 private boolean inSessionId; 94 private String sessionId; 95 96 private boolean inServerUrl; 97 private String serverUrl; 98 99 @Override 100 public void characters(char[] ch, int start, int length) { 101 if (inSessionId) sessionId = new String(ch, start, length); 102 if (inServerUrl) serverUrl = new String(ch, start, length); 103 } 104 105 @Override 106 public void endElement(String uri, String localName, String qName) { 107 if (localName != null) { 108 if (localName.equals("sessionId")) { 109 inSessionId = false; 110 } 111 112 if (localName.equals("serverUrl")) { 113 inServerUrl = false; 114 } 115 } 116 } 117 118 @Override 119 public void startElement(String uri, String localName, 120 String qName, Attributes attributes) { 121 if (localName != null) { 122 if (localName.equals("sessionId")) { 123 inSessionId = true; 124 } 125 126 if (localName.equals("serverUrl")) { 127 inServerUrl = true; 128 } 129 } 130 } 131 } 132} - In a different browser window, create or modify an InvoiceStatement.
After you create or change data that corresponds to the query in your
PushTopic, the output looks something like this:
1Running streaming client example.... 2Login successful! 3Endpoint: https://www.salesforce.com 4Sessionid=00DD0000000FSp9!AQIAQIVjGYijFhiAROTc455T6kEVeJGXuW5VCnp 5 LANCMawS7.p5fXbjYlqCgx7They_zFjmP5n9HxvfUA6xGSGtC1Nb6P4S. 6 7Waiting for handshake 8[CHANNEL:META_HANDSHAKE]: 9{ 10 "id":"1", 11 "minimumVersion":"1.0", 12 "supportedConnectionTypes":["long-polling"], 13 "successful":true, 14 "channel":"/meta/handshake", 15 "clientId":"31t0cjzfbgnfqn1rggumba0k98u", 16 "version":"1.0" 17} 18 19[CHANNEL:META_CONNECT]: 20{ 21 "id":"2", 22 "successful":true, 23 "advice":{"interval":0,"reconnect":"retry","timeout":110000}, 24 "channel":"/meta/connect"} 25 Subscribing for channel: /topic/InvoiceStatementUpdates 26 Waiting for streamed data from your organization ... 27[CHANNEL:META_SUBSCRIBE]: 28{ 29 "id":"4", 30 "subscription":"/topic/InvoiceStatementUpdates", 31 "successful":true, 32 "channel":"/meta/subscribe" 33} 34 35[CHANNEL:META_CONNECT]: 36{ 37 "id":"3", 38 "successful":true, 39 "channel":"/meta/connect" 40} 41 42Received Message: 43{ 44"data": 45 { 46 "sobject": 47 { 48 "Name":"INV-0002", 49 "Id":"001D000000J3fTHIAZ", 50 "Status__c":"Pending"}, 51 "event":{"type":"updated", 52 "createdDate":"2011-09-06T18:51:08.000+0000" 53 } 54 }, 55 "channel":"/topic/InvoiceStatementUpdates" 56} 57 58[CHANNEL:META_CONNECT]: 59{ 60 "id":"5", 61 "successful":true, 62 "channel":"/meta/connect" 63}