1、BusServer每隔一段时间,就会产生一个TimeInfo的消息
package com.neohope.springcloud.test.busserver;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.boot.context.annotation.DeterminableImports;
@Configuration
@ComponentScan
@EnableAutoConfiguration
@EnableBinding(Source.class)
public class AppSource {
private static Logger logger = LoggerFactory.getLogger(AppSource.class);
public static void main(String[] args) {
SpringApplication.run(AppSource.class, args);
}
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageSource<TimeInfo> timerMessageSource() {
return new MessageSource<TimeInfo>() {
@Override
public Message<TimeInfo> receive() {
logger.info("Msg Sent");
return MessageBuilder.withPayload(new TimeInfo(new Date().getTime()+"","hiup")).build();
}
};
}
public static class TimeInfo{
private String time;
private String label;
public TimeInfo(String time, String label) {
super();
this.time = time;
this.label = label;
}
public String getTime() {
return time;
}
public String getLabel() {
return label;
}
}
}
2、pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>app-starters-build</artifactId> <version>1.1.3.M1</version> <relativePath /> </parent> <groupId>com.neohope.springcloud.test</groupId> <artifactId>busserver</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>busserver</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>4.3.6.RELEASE</version> </dependency> <!--dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot</artifactId> <version>1.5.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> <version>1.5.1.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/libs-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> </project>
3、application.properties
server.port=-1 spring.cloud.stream.bindings.output.destination=timerTopic spring.cloud.stream.bindings.output.content-type=application/json spring.cloud.stream.kafka.binder.zkNodes=localhost #spring.cloud.stream.kafka.binder.brokers=localhost
4、BusClient会收到对应的消息并输出日志
package com.neohope.springcloud.test.busclient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.boot.SpringApplication;
@Configuration
@ComponentScan
@EnableAutoConfiguration
@EnableBinding(Sink.class)
public class AppSink
{
private static Logger logger = LoggerFactory.getLogger(AppSink.class);
public static void main(String[] args) {
SpringApplication.run(AppSink.class, args);
}
@StreamListener(Sink.INPUT)
public void loggerSink(SinkTimeInfo sinkTimeInfo) {
logger.info("Received: " + sinkTimeInfo.toString());
}
public static class SinkTimeInfo{
private String time;
private String label;
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public void setSinkLabel(String label) {
this.label = label;
}
public String getLabel() {
return label;
}
@Override
public String toString() {
return "SinkTimeInfo [time=" + time + ", label=" + label + "]";
}
}
}
5、pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>app-starters-build</artifactId> <version>1.1.3.M1</version> <relativePath /> </parent> <groupId>com.neohope.springcloud.test</groupId> <artifactId>busclient</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>busclient</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>4.3.6.RELEASE</version> </dependency> <!--dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot</artifactId> <version>1.5.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> <version>1.5.1.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/libs-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> </project>
6、application.properties
server.port=-1 spring.cloud.stream.bindings.input.destination=timerTopic spring.cloud.stream.bindings.input.content-type=application/json spring.cloud.stream.bindings.input.group=timerGroup spring.cloud.stream.binder.kafka.resetOffsets=true spring.cloud.stream.kafka.binder.zkNodes=localhost #spring.cloud.stream.kafka.binder.brokers=localhost