Newer Version Available
Create a Java Client
Create a Java client that uses Bayeux and CometD to subscribe
to the channel.
- Download and install the CometD and Jetty .jar files if necessary.
-
In a new Java project, add the following code to a Java
source file named StreamingClientExample.java. This code subscribes to the Streaming channel you created and listens
for notifications. Depending on your Java development environment,
you might have to rename this file and class to Main.
1swfobject.registerObject("clippy.codeblock-0", "9"); 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17package demo; 18 19import org.cometd.bayeux.Channel; 20import org.cometd.bayeux.Message; 21import org.cometd.bayeux.client.ClientSessionChannel; 22import org.cometd.bayeux.client.ClientSessionChannel.MessageListener; 23import org.cometd.client.BayeuxClient; 24import org.cometd.client.transport.ClientTransport; 25import org.cometd.client.transport.LongPollingTransport; 26 27import org.eclipse.jetty.client.ContentExchange; 28import org.eclipse.jetty.client.HttpClient; 29 30import java.net.MalformedURLException; 31import java.net.URL; 32import java.util.HashMap; 33import java.util.Map; 34 35/** 36 * This example demonstrates how a streaming client works 37 * against the Salesforce Streaming API with generic notifications. 38 **/ 39 40public class StreamingClientExample { 41 42 // This URL is used only for logging in. The LoginResult 43 // returns a serverUrl which is then used for constructing 44 // the streaming URL. The serverUrl points to the endpoint 45 // where your organization is hosted. 46 47 static final String LOGIN_ENDPOINT = "https://login.salesforce.com"; 48 private static final String USER_NAME = "change_this_to_your_testuser@yourcompany.com"; 49 private static final String PASSWORD = "change_this_to_your_testpassword"; 50 // NOTE: Putting passwords in code is not a good practice and not recommended. 51 52 // The channel to subscribe to. 53 // Be sure to create the StreamingChannel before running this sample. 54 private static final String CHANNEL = "/u/notifications/ExampleUserChannel"; 55 private static final String STREAMING_ENDPOINT_URI = "/cometd/34.0"; 56 57 // The long poll duration. 58 private static final int CONNECTION_TIMEOUT = 20 * 1000; // milliseconds 59 private static final int READ_TIMEOUT = 120 * 1000; // milliseconds 60 61 public static void main(String[] args) throws Exception { 62 63 System.out.println("Running streaming client example...."); 64 65 final BayeuxClient client = makeClient(); 66 client.getChannel(Channel.META_HANDSHAKE).addListener 67 (new ClientSessionChannel.MessageListener() { 68 69 public void onMessage(ClientSessionChannel channel, Message message) { 70 71 System.out.println("[CHANNEL:META_HANDSHAKE]: " + message); 72 73 boolean success = message.isSuccessful(); 74 if (!success) { 75 String error = (String) message.get("error"); 76 if (error != null) { 77 System.out.println("Error during HANDSHAKE: " + error); 78 System.out.println("Exiting..."); 79 System.exit(1); 80 } 81 82 Exception exception = (Exception) message.get("exception"); 83 if (exception != null) { 84 System.out.println("Exception during HANDSHAKE: "); 85 exception.printStackTrace(); 86 System.out.println("Exiting..."); 87 System.exit(1); 88 89 } 90 } 91 } 92 93 }); 94 95 client.getChannel(Channel.META_CONNECT).addListener( 96 new ClientSessionChannel.MessageListener() { 97 public void onMessage(ClientSessionChannel channel, Message message) { 98 99 System.out.println("[CHANNEL:META_CONNECT]: " + message); 100 101 boolean success = message.isSuccessful(); 102 if (!success) { 103 String error = (String) message.get("error"); 104 if (error != null) { 105 System.out.println("Error during CONNECT: " + error); 106 System.out.println("Exiting..."); 107 System.exit(1); 108 } 109 } 110 } 111 112 }); 113 114 client.getChannel(Channel.META_SUBSCRIBE).addListener( 115 new ClientSessionChannel.MessageListener() { 116 117 public void onMessage(ClientSessionChannel channel, Message message) { 118 119 System.out.println("[CHANNEL:META_SUBSCRIBE]: " + message); 120 boolean success = message.isSuccessful(); 121 if (!success) { 122 String error = (String) message.get("error"); 123 if (error != null) { 124 System.out.println("Error during SUBSCRIBE: " + error); 125 System.out.println("Exiting..."); 126 System.exit(1); 127 } 128 } 129 } 130 }); 131 132 133 134 client.handshake(); 135 System.out.println("Waiting for handshake"); 136 137 boolean handshaken = client.waitFor(10 * 1000, BayeuxClient.State.CONNECTED); 138 if (!handshaken) { 139 System.out.println("Failed to handshake: " + client); 140 System.exit(1); 141 } 142 143 144 145 System.out.println("Subscribing for channel: " + CHANNEL); 146 147 client.getChannel(CHANNEL).subscribe(new MessageListener() { 148 @Override 149 public void onMessage(ClientSessionChannel channel, Message message) { 150 System.out.println("Received Message: " + message); 151 } 152 }); 153 154 System.out.println("Waiting for streamed data from your organization ..."); 155 while (true) { 156 // This infinite loop is for demo only, 157 // to receive streamed events on the 158 // specified topic from your organization. 159 } 160 } 161 162 163 164 private static BayeuxClient makeClient() throws Exception { 165 HttpClient httpClient = new HttpClient(); 166 httpClient.setConnectTimeout(CONNECTION_TIMEOUT); 167 httpClient.setTimeout(READ_TIMEOUT); 168 httpClient.start(); 169 170 String[] pair = SoapLoginUtil.login(httpClient, USER_NAME, PASSWORD); 171 172 if (pair == null) { 173 System.exit(1); 174 } 175 176 assert pair.length == 2; 177 final String sessionid = pair[0]; 178 String endpoint = pair[1]; 179 System.out.println("Login successful!\nEndpoint: " + endpoint 180 + "\nSessionid=" + sessionid); 181 182 Map<String, Object> options = new HashMap<String, Object>(); 183 options.put(ClientTransport.TIMEOUT_OPTION, READ_TIMEOUT); 184 LongPollingTransport transport = new LongPollingTransport( 185 options, httpClient) { 186 187 @Override 188 protected void customize(ContentExchange exchange) { 189 super.customize(exchange); 190 exchange.addRequestHeader("Authorization", "OAuth " + sessionid); 191 } 192 }; 193 194 BayeuxClient client = new BayeuxClient(salesforceStreamingEndpoint( 195 endpoint), transport); 196 return client; 197 } 198 199 private static String salesforceStreamingEndpoint(String endpoint) 200 throws MalformedURLException { 201 return new URL(endpoint + STREAMING_ENDPOINT_URI).toExternalForm(); 202 } 203 204} 205 206 -
Edit StreamingClientExample.java and
modify the following values:
File Name Static Resource Name USER_NAME Username of the logged-in user PASSWORD Password for the USER_NAME (or logged-in user) CHANNEL /u/notifications/ExampleUserChannel LOGIN_ENDPOINT https://test.salesforce.com (Only if you are using a sandbox. If you are in a production organization, no change is required for LOGIN_ENDPOINT.) -
Add the following code to a Java source file named SoapLoginUtil.java. This code sends a username and password
to the server and receives the session ID.
1swfobject.registerObject("clippy.codeblock-1", "9"); 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17package demo; 18 19import java.io.ByteArrayInputStream; 20import java.io.IOException; 21import java.io.UnsupportedEncodingException; 22import java.net.MalformedURLException; 23import java.net.URL; 24 25import org.eclipse.jetty.client.ContentExchange; 26import org.eclipse.jetty.client.HttpClient; 27import org.xml.sax.Attributes; 28import org.xml.sax.SAXException; 29import org.xml.sax.helpers.DefaultHandler; 30 31import javax.xml.parsers.ParserConfigurationException; 32import javax.xml.parsers.SAXParser; 33import javax.xml.parsers.SAXParserFactory; 34 35public final class SoapLoginUtil { 36 37 // The enterprise SOAP API endpoint used for the login call in this example. 38 private static final String SERVICES_SOAP_PARTNER_ENDPOINT = "/services/Soap/u/22.0/"; 39 40 private static final String ENV_START = 41 "<soapenv:Envelope xmlns:soapenv='http://schemas.xmlsoap.org/soap/envelope/' " 42 + "xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' " + 43 "xmlns:urn='urn:partner.soap.sforce.com'><soapenv:Body>"; 44 45 private static final String ENV_END = "</soapenv:Body></soapenv:Envelope>"; 46 47 private static byte[] soapXmlForLogin(String username, String password) 48 throws UnsupportedEncodingException { 49 return (ENV_START + 50 " <urn:login>" + 51 " <urn:username>" + username + "</urn:username>" + 52 " <urn:password>" + password + "</urn:password>" + 53 " </urn:login>" + 54 ENV_END).getBytes("UTF-8"); 55 } 56 57 public static String[] login(HttpClient client, String username, String password) 58 throws IOException, InterruptedException, SAXException, 59 ParserConfigurationException { 60 61 ContentExchange exchange = new ContentExchange(); 62 exchange.setMethod("POST"); 63 exchange.setURL(getSoapURL()); 64 exchange.setRequestContentSource(new ByteArrayInputStream(soapXmlForLogin( 65 username, password))); 66 exchange.setRequestHeader("Content-Type", "text/xml"); 67 exchange.setRequestHeader("SOAPAction", "''"); 68 exchange.setRequestHeader("PrettyPrint", "Yes"); 69 70 client.send(exchange); 71 exchange.waitForDone(); 72 String response = exchange.getResponseContent(); 73 74 SAXParserFactory spf = SAXParserFactory.newInstance(); 75 spf.setNamespaceAware(true); 76 SAXParser saxParser = spf.newSAXParser(); 77 78 LoginResponseParser parser = new LoginResponseParser(); 79 saxParser.parse(new ByteArrayInputStream( 80 response.getBytes("UTF-8")), parser); 81 82 if (parser.sessionId == null || parser.serverUrl == null) { 83 System.out.println("Login Failed!\n" + response); 84 return null; 85 } 86 87 URL soapEndpoint = new URL(parser.serverUrl); 88 StringBuilder endpoint = new StringBuilder() 89 .append(soapEndpoint.getProtocol()) 90 .append("://") 91 .append(soapEndpoint.getHost()); 92 93 if (soapEndpoint.getPort() > 0) endpoint.append(":") 94 .append(soapEndpoint.getPort()); 95 return new String[] {parser.sessionId, endpoint.toString()}; 96 } 97 98 private static String getSoapURL() throws MalformedURLException { 99 return new URL(StreamingClientExample.LOGIN_ENDPOINT + 100 getSoapUri()).toExternalForm(); 101 } 102 103 private static String getSoapUri() { 104 return SERVICES_SOAP_PARTNER_ENDPOINT; 105 } 106 107 private static class LoginResponseParser extends DefaultHandler { 108 109 private boolean inSessionId; 110 private String sessionId; 111 112 private boolean inServerUrl; 113 private String serverUrl; 114 115 @Override 116 public void characters(char[] ch, int start, int length) { 117 if (inSessionId) sessionId = new String(ch, start, length); 118 if (inServerUrl) serverUrl = new String(ch, start, length); 119 } 120 121 @Override 122 public void endElement(String uri, String localName, String qName) { 123 if (localName != null) { 124 if (localName.equals("sessionId")) { 125 inSessionId = false; 126 } 127 128 if (localName.equals("serverUrl")) { 129 inServerUrl = false; 130 } 131 } 132 } 133 134 @Override 135 public void startElement(String uri, String localName, 136 String qName, Attributes attributes) { 137 if (localName != null) { 138 if (localName.equals("sessionId")) { 139 inSessionId = true; 140 } 141 142 if (localName.equals("serverUrl")) { 143 inServerUrl = true; 144 } 145 } 146 } 147 } 148} -
When you run this client app and generate notifications
using the REST resource, the output will look something like:
1Running streaming client example.... 2Login successful! 3Endpoint: https://www.salesforce.com 4Sessionid=00DD0000000FSp9!AQIAQIVjGYijFhiAROTc455T6kEVeJGXuW5VCnp 5 LANCMawS7.p5fXbjYlqCgx7They_zFjmP5n9HxvfUA6xGSGtC1Nb6P4S. 6 7Waiting for handshake 8[CHANNEL:META_HANDSHAKE]: 9{ 10 "id":"1", 11 "minimumVersion":"1.0", 12 "supportedConnectionTypes":["long-polling"], 13 "successful":true, 14 "channel":"/meta/handshake", 15 "clientId":"31t0cjzfbgnfqn1rggumba0k98u", 16 "version":"1.0" 17} 18 19[CHANNEL:META_CONNECT]: 20{ 21 "id":"2", 22 "successful":true, 23 "advice":{"interval":0,"reconnect":"retry","timeout":110000}, 24 "channel":"/meta/connect"} 25 Subscribing for channel: /u/notifications/ExampleUserChannel 26 Waiting for streamed data from your organization ... 27[CHANNEL:META_SUBSCRIBE]: 28{ 29 "id":"4", 30 "subscription":"/u/notifications/ExampleUserChannel", 31 "successful":true, 32 "channel":"/meta/subscribe" 33} 34 35[CHANNEL:META_CONNECT]: 36{ 37 "id":"3", 38 "successful":true, 39 "channel":"/meta/connect" 40} 41 42Received Message: 43{ 44 "data": 45 { 46 "event": 47 { 48 "createdDate":"2013-07-30T23:15:59.000+0000" 49 }, 50 "payload":"Broadcast message to all subscribers" 51 }, 52 "channel":"/u/notifications/ExampleUserChannel", 53 "clientId":"8173z2cplh8q6m1rmud93zygnf8" 54} 55[CHANNEL:META_CONNECT]: 56{ 57 "id":"5", 58 "successful":true, 59 "channel":"/meta/connect" 60}