No Results
Search Tips:
- Please consider misspellings
- Try different search keywords
Newer Version Available
Create a Java Client
Create a Java client that uses Bayeux and CometD to subscribe
to the channel.
- Download and install the CometD and Jetty .jar files if necessary.
- In a new Java project, add the following code to a Java
source file named StreamingClientExample.java. This code subscribes to the Streaming channel you created and listens
for notifications. Depending on your Java development environment,
you might have to rename this file and class to Main.
1swfobject.registerObject("clippy.codeblock-0", "9");package demo; 2 3import org.cometd.bayeux.Channel; 4import org.cometd.bayeux.Message; 5import org.cometd.bayeux.client.ClientSessionChannel; 6import org.cometd.bayeux.client.ClientSessionChannel.MessageListener; 7import org.cometd.client.BayeuxClient; 8import org.cometd.client.transport.ClientTransport; 9import org.cometd.client.transport.LongPollingTransport; 10 11import org.eclipse.jetty.client.ContentExchange; 12import org.eclipse.jetty.client.HttpClient; 13 14import java.net.MalformedURLException; 15import java.net.URL; 16import java.util.HashMap; 17import java.util.Map; 18 19/** 20 * This example demonstrates how a streaming client works 21 * against the Salesforce Streaming API with generic notifications. 22 **/ 23 24public class StreamingClientExample { 25 26 // This URL is used only for logging in. The LoginResult 27 // returns a serverUrl which is then used for constructing 28 // the streaming URL. The serverUrl points to the endpoint 29 // where your organization is hosted. 30 31 static final String LOGIN_ENDPOINT = "https://login.salesforce.com"; 32 private static final String USER_NAME = "change_this_to_your_testuser@yourcompany.com"; 33 private static final String PASSWORD = "change_this_to_your_testpassword"; 34 // NOTE: Putting passwords in code is not a good practice and not recommended. 35 36 // The channel to subscribe to. 37 // Be sure to create the StreamingChannel before running this sample. 38 private static final String CHANNEL = "/u/notifications/ExampleUserChannel"; 39 private static final String STREAMING_ENDPOINT_URI = "/cometd/30.0"; 40 41 // The long poll duration. 42 private static final int CONNECTION_TIMEOUT = 20 * 1000; // milliseconds 43 private static final int READ_TIMEOUT = 120 * 1000; // milliseconds 44 45 public static void main(String[] args) throws Exception { 46 47 System.out.println("Running streaming client example...."); 48 49 final BayeuxClient client = makeClient(); 50 client.getChannel(Channel.META_HANDSHAKE).addListener 51 (new ClientSessionChannel.MessageListener() { 52 53 public void onMessage(ClientSessionChannel channel, Message message) { 54 55 System.out.println("[CHANNEL:META_HANDSHAKE]: " + message); 56 57 boolean success = message.isSuccessful(); 58 if (!success) { 59 String error = (String) message.get("error"); 60 if (error != null) { 61 System.out.println("Error during HANDSHAKE: " + error); 62 System.out.println("Exiting..."); 63 System.exit(1); 64 } 65 66 Exception exception = (Exception) message.get("exception"); 67 if (exception != null) { 68 System.out.println("Exception during HANDSHAKE: "); 69 exception.printStackTrace(); 70 System.out.println("Exiting..."); 71 System.exit(1); 72 73 } 74 } 75 } 76 77 }); 78 79 client.getChannel(Channel.META_CONNECT).addListener( 80 new ClientSessionChannel.MessageListener() { 81 public void onMessage(ClientSessionChannel channel, Message message) { 82 83 System.out.println("[CHANNEL:META_CONNECT]: " + message); 84 85 boolean success = message.isSuccessful(); 86 if (!success) { 87 String error = (String) message.get("error"); 88 if (error != null) { 89 System.out.println("Error during CONNECT: " + error); 90 System.out.println("Exiting..."); 91 System.exit(1); 92 } 93 } 94 } 95 96 }); 97 98 client.getChannel(Channel.META_SUBSCRIBE).addListener( 99 new ClientSessionChannel.MessageListener() { 100 101 public void onMessage(ClientSessionChannel channel, Message message) { 102 103 System.out.println("[CHANNEL:META_SUBSCRIBE]: " + message); 104 boolean success = message.isSuccessful(); 105 if (!success) { 106 String error = (String) message.get("error"); 107 if (error != null) { 108 System.out.println("Error during SUBSCRIBE: " + error); 109 System.out.println("Exiting..."); 110 System.exit(1); 111 } 112 } 113 } 114 }); 115 116 117 118 client.handshake(); 119 System.out.println("Waiting for handshake"); 120 121 boolean handshaken = client.waitFor(10 * 1000, BayeuxClient.State.CONNECTED); 122 if (!handshaken) { 123 System.out.println("Failed to handshake: " + client); 124 System.exit(1); 125 } 126 127 128 129 System.out.println("Subscribing for channel: " + CHANNEL); 130 131 client.getChannel(CHANNEL).subscribe(new MessageListener() { 132 @Override 133 public void onMessage(ClientSessionChannel channel, Message message) { 134 System.out.println("Received Message: " + message); 135 } 136 }); 137 138 System.out.println("Waiting for streamed data from your organization ..."); 139 while (true) { 140 // This infinite loop is for demo only, 141 // to receive streamed events on the 142 // specified topic from your organization. 143 } 144 } 145 146 147 148 private static BayeuxClient makeClient() throws Exception { 149 HttpClient httpClient = new HttpClient(); 150 httpClient.setConnectTimeout(CONNECTION_TIMEOUT); 151 httpClient.setTimeout(READ_TIMEOUT); 152 httpClient.start(); 153 154 String[] pair = SoapLoginUtil.login(httpClient, USER_NAME, PASSWORD); 155 156 if (pair == null) { 157 System.exit(1); 158 } 159 160 assert pair.length == 2; 161 final String sessionid = pair[0]; 162 String endpoint = pair[1]; 163 System.out.println("Login successful!\nEndpoint: " + endpoint 164 + "\nSessionid=" + sessionid); 165 166 Map<String, Object> options = new HashMap<String, Object>(); 167 options.put(ClientTransport.TIMEOUT_OPTION, READ_TIMEOUT); 168 LongPollingTransport transport = new LongPollingTransport( 169 options, httpClient) { 170 171 @Override 172 protected void customize(ContentExchange exchange) { 173 super.customize(exchange); 174 exchange.addRequestHeader("Authorization", "OAuth " + sessionid); 175 } 176 }; 177 178 BayeuxClient client = new BayeuxClient(salesforceStreamingEndpoint( 179 endpoint), transport); 180 return client; 181 } 182 183 private static String salesforceStreamingEndpoint(String endpoint) 184 throws MalformedURLException { 185 return new URL(endpoint + STREAMING_ENDPOINT_URI).toExternalForm(); 186 } 187 188} 189 190 - Edit StreamingClientExample.java and
modify the following values:
File Name Static Resource Name USER_NAME Username of the logged-in user PASSWORD Password for the USER_NAME (or logged-in user) CHANNEL /u/notifications/ExampleUserChannel LOGIN_ENDPOINT https://test.salesforce.com (Only if you are using a sandbox. If you are in a production organization, no change is required for LOGIN_ENDPOINT.) - Add the following code to a Java source file named SoapLoginUtil.java. This code sends a username and password
to the server and receives the session ID.
1swfobject.registerObject("clippy.codeblock-1", "9");package demo; 2 3import java.io.ByteArrayInputStream; 4import java.io.IOException; 5import java.io.UnsupportedEncodingException; 6import java.net.MalformedURLException; 7import java.net.URL; 8 9import org.eclipse.jetty.client.ContentExchange; 10import org.eclipse.jetty.client.HttpClient; 11import org.xml.sax.Attributes; 12import org.xml.sax.SAXException; 13import org.xml.sax.helpers.DefaultHandler; 14 15import javax.xml.parsers.ParserConfigurationException; 16import javax.xml.parsers.SAXParser; 17import javax.xml.parsers.SAXParserFactory; 18 19public final class SoapLoginUtil { 20 21 // The enterprise SOAP API endpoint used for the login call in this example. 22 private static final String SERVICES_SOAP_PARTNER_ENDPOINT = "/services/Soap/u/22.0/"; 23 24 private static final String ENV_START = 25 "<soapenv:Envelope xmlns:soapenv='http://schemas.xmlsoap.org/soap/envelope/' " 26 + "xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' " + 27 "xmlns:urn='urn:partner.soap.sforce.com'><soapenv:Body>"; 28 29 private static final String ENV_END = "</soapenv:Body></soapenv:Envelope>"; 30 31 private static byte[] soapXmlForLogin(String username, String password) 32 throws UnsupportedEncodingException { 33 return (ENV_START + 34 " <urn:login>" + 35 " <urn:username>" + username + "</urn:username>" + 36 " <urn:password>" + password + "</urn:password>" + 37 " </urn:login>" + 38 ENV_END).getBytes("UTF-8"); 39 } 40 41 public static String[] login(HttpClient client, String username, String password) 42 throws IOException, InterruptedException, SAXException, 43 ParserConfigurationException { 44 45 ContentExchange exchange = new ContentExchange(); 46 exchange.setMethod("POST"); 47 exchange.setURL(getSoapURL()); 48 exchange.setRequestContentSource(new ByteArrayInputStream(soapXmlForLogin( 49 username, password))); 50 exchange.setRequestHeader("Content-Type", "text/xml"); 51 exchange.setRequestHeader("SOAPAction", "''"); 52 exchange.setRequestHeader("PrettyPrint", "Yes"); 53 54 client.send(exchange); 55 exchange.waitForDone(); 56 String response = exchange.getResponseContent(); 57 58 SAXParserFactory spf = SAXParserFactory.newInstance(); 59 spf.setNamespaceAware(true); 60 SAXParser saxParser = spf.newSAXParser(); 61 62 LoginResponseParser parser = new LoginResponseParser(); 63 saxParser.parse(new ByteArrayInputStream( 64 response.getBytes("UTF-8")), parser); 65 66 if (parser.sessionId == null || parser.serverUrl == null) { 67 System.out.println("Login Failed!\n" + response); 68 return null; 69 } 70 71 URL soapEndpoint = new URL(parser.serverUrl); 72 StringBuilder endpoint = new StringBuilder() 73 .append(soapEndpoint.getProtocol()) 74 .append("://") 75 .append(soapEndpoint.getHost()); 76 77 if (soapEndpoint.getPort() > 0) endpoint.append(":") 78 .append(soapEndpoint.getPort()); 79 return new String[] {parser.sessionId, endpoint.toString()}; 80 } 81 82 private static String getSoapURL() throws MalformedURLException { 83 return new URL(StreamingClientExample.LOGIN_ENDPOINT + 84 getSoapUri()).toExternalForm(); 85 } 86 87 private static String getSoapUri() { 88 return SERVICES_SOAP_PARTNER_ENDPOINT; 89 } 90 91 private static class LoginResponseParser extends DefaultHandler { 92 93 private boolean inSessionId; 94 private String sessionId; 95 96 private boolean inServerUrl; 97 private String serverUrl; 98 99 @Override 100 public void characters(char[] ch, int start, int length) { 101 if (inSessionId) sessionId = new String(ch, start, length); 102 if (inServerUrl) serverUrl = new String(ch, start, length); 103 } 104 105 @Override 106 public void endElement(String uri, String localName, String qName) { 107 if (localName != null) { 108 if (localName.equals("sessionId")) { 109 inSessionId = false; 110 } 111 112 if (localName.equals("serverUrl")) { 113 inServerUrl = false; 114 } 115 } 116 } 117 118 @Override 119 public void startElement(String uri, String localName, 120 String qName, Attributes attributes) { 121 if (localName != null) { 122 if (localName.equals("sessionId")) { 123 inSessionId = true; 124 } 125 126 if (localName.equals("serverUrl")) { 127 inServerUrl = true; 128 } 129 } 130 } 131 } 132} - When you run this client app and generate notifications
using the REST resource, the output will look something like:
1Running streaming client example.... 2Login successful! 3Endpoint: https://www.salesforce.com 4Sessionid=00DD0000000FSp9!AQIAQIVjGYijFhiAROTc455T6kEVeJGXuW5VCnp 5 LANCMawS7.p5fXbjYlqCgx7They_zFjmP5n9HxvfUA6xGSGtC1Nb6P4S. 6 7Waiting for handshake 8[CHANNEL:META_HANDSHAKE]: 9{ 10 "id":"1", 11 "minimumVersion":"1.0", 12 "supportedConnectionTypes":["long-polling"], 13 "successful":true, 14 "channel":"/meta/handshake", 15 "clientId":"31t0cjzfbgnfqn1rggumba0k98u", 16 "version":"1.0" 17} 18 19[CHANNEL:META_CONNECT]: 20{ 21 "id":"2", 22 "successful":true, 23 "advice":{"interval":0,"reconnect":"retry","timeout":110000}, 24 "channel":"/meta/connect"} 25 Subscribing for channel: /u/notifications/ExampleUserChannel 26 Waiting for streamed data from your organization ... 27[CHANNEL:META_SUBSCRIBE]: 28{ 29 "id":"4", 30 "subscription":"/u/notifications/ExampleUserChannel", 31 "successful":true, 32 "channel":"/meta/subscribe" 33} 34 35[CHANNEL:META_CONNECT]: 36{ 37 "id":"3", 38 "successful":true, 39 "channel":"/meta/connect" 40} 41 42Received Message: 43{ 44 "data": 45 { 46 "event": 47 { 48 "createdDate":"2013-07-30T23:15:59.000+0000" 49 }, 50 "payload":"Broadcast message to all subscribers" 51 }, 52 "channel":"/u/notifications/ExampleUserChannel", 53 "clientId":"8173z2cplh8q6m1rmud93zygnf8" 54} 55[CHANNEL:META_CONNECT]: 56{ 57 "id":"5", 58 "successful":true, 59 "channel":"/meta/connect" 60}