Update an existing microservice to send events to the Event Hub
You will now implement the functionality that will allow you to emulate sending events to the telemetry Event Hub. For this you will update the customers-service
. Each time a new customer gets created, send a new event to the event hub, together with 100 new pet events (we are going to simulate some dummy load).
- Sending and Receiving Message by Azure Event Hubs and Spring Cloud Stream Binder Eventhubs in Spring Boot Application
- Spring Cloud Stream with Azure Event Hubs
Step by step guidance
-
In the
customers-service
pom.xml
file, add the following extra dependency:<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId> </dependency>
-
Replace the contents of the src/main/java/org/springframework/samples/petclinic/customers/CustomersServiceApplication.java file with:
package org.springframework.samples.petclinic.customers; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Maciej Szarlinski */ @EnableDiscoveryClient @SpringBootApplication public class CustomersServiceApplication { private static final Logger LOGGER = LoggerFactory.getLogger(CustomersServiceApplication.class); public static void main(String[] args) { SpringApplication.run(CustomersServiceApplication.class, args); } @ServiceActivator(inputChannel = "telemetry.errors") public void producerError(Message<?> message) { LOGGER.error("Handling Producer ERROR: " + message); } }
This adds an extra logger and a method that will be called in case of errors in the sending of events.
-
Replace the contents of the src/spring-petclinic-customers-service/src/main/java/org/springframework/samples/petclinic/customers/web/OwnerResource.java file with:
package org.springframework.samples.petclinic.customers.web; import io.micrometer.core.annotation.Timed; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.samples.petclinic.customers.model.Owner; import org.springframework.samples.petclinic.customers.model.OwnerRepository; import org.springframework.web.bind.annotation.*; import jakarta.validation.Valid; import jakarta.validation.constraints.Min; import java.util.List; import java.util.Optional; import reactor.core.publisher.Sinks; import reactor.core.publisher.Flux; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import java.util.function.Supplier; import org.springframework.beans.factory.annotation.Autowired; /** * @author Juergen Hoeller * @author Ken Krebs * @author Arjen Poutsma * @author Michael Isvy * @author Maciej Szarlinski */ @RequestMapping("/owners") @RestController @Timed("petclinic.owner") @RequiredArgsConstructor @Slf4j class OwnerResource { private final OwnerRepository ownerRepository; @Autowired private Sinks.Many<Message<String>> many; private static final Logger LOGGER = LoggerFactory.getLogger(OwnerResource.class); /** * Create Owner */ @PostMapping @ResponseStatus(HttpStatus.CREATED) public Owner createOwner(@Valid @RequestBody Owner owner) { LOGGER.info("+++Sending events+++"); many.emitNext(MessageBuilder.withPayload("New owner created: " + owner.getFirstName() + " " + owner.getLastName() + " with many pets ...").build(), Sinks. EmitFailureHandler.FAIL_FAST); for(int i = 0; i < 100; i++) { many.emitNext(MessageBuilder.withPayload("Pet " + i).build(), Sinks.EmitFailureHandler.FAIL_FAST); } return ownerRepository.save(owner); } /** * Read single Owner */ @GetMapping(value = "/{ownerId}") public Optional<Owner> findOwner(@PathVariable("ownerId") @Min(1) int ownerId) { return ownerRepository.findById(ownerId); } /** * Read List of Owners */ @GetMapping public List<Owner> findAll() { return ownerRepository.findAll(); } /** * Update Owner */ @PutMapping(value = "/{ownerId}") @ResponseStatus(HttpStatus.NO_CONTENT) public void updateOwner(@PathVariable("ownerId") @Min(1) int ownerId, @Valid @RequestBody Owner ownerRequest) { final Optional<Owner> owner = ownerRepository.findById(ownerId); final Owner ownerModel = owner.orElseThrow(() -> new ResourceNotFoundException("Owner "+ownerId+" not found")); // This is done by hand for simplicity purpose. In a real life use-case we should consider using MapStruct. ownerModel.setFirstName(ownerRequest.getFirstName()); ownerModel.setLastName(ownerRequest.getLastName()); ownerModel.setCity(ownerRequest.getCity()); ownerModel.setAddress(ownerRequest.getAddress()); ownerModel.setTelephone(ownerRequest.getTelephone()); log.info("Saving owner {}", ownerModel); ownerRepository.save(ownerModel); } }
This adds an additional sync, that is used in the
createOwner
method to add events to. -
In the /src/spring-petclinic-customers-service/src/main/java/org/springframework/samples/petclinic/customers/config/ folder, add an extra
ManualProducerConfiguration.java
file with the below contents:package org.springframework.samples.petclinic.customers.config; import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders; import com.azure.spring.messaging.AzureHeaders; import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.messaging.Message; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; @Configuration public class ManualProducerConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(ManualProducerConfiguration.class); @Bean public Sinks.Many<Message<String>> many() { return Sinks.many().unicast().onBackpressureBuffer(); } @Bean public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many) { return () -> many.asFlux() .doOnNext(m -> LOGGER.info("Manually sending message {}", m)) .doOnError(t -> LOGGER.error("Error encountered", t)); } }
The
ManualProducerConfiguration
uses the sync to send the events to the event hub. -
Update the /src/spring-petclinic-customers-service/src/main/resources/application.yml file, and replace its contents with:
```yaml spring: application: name: customers-service config: import: optional:configserver:${CONFIG_SERVER_URL:http://localhost:8888/} cloud: function: supply;
spring: config: activate: on-profile: docker import: configserver:http://config-server:8888
This adds an extra spring cloud function for the supply method in the `ManualProducerConfiguration` class.
1. Save the changes to all 4 files.
2. In the config repository, add the `spring.cloud.stream` configuration for sending messages to the event hub. Add this section right under the spring statement on line 10. Make sure to change the `<your-event-hub-namespace>` with the name of the namespace you created.
```yaml
cloud:
stream:
bindings:
supply-out-0:
destination: telemetry
binders:
eventhub:
type: eventhubs
default-candidate: true
environment:
spring:
cloud:
azure:
eventhubs:
namespace: <your-event-hub-namespace>
poller:
initial-delay: 0
fixed-delay: 1000
This configures the output stream for supply-out-0
to use the telemetry endpoint of the event hub. It also indicates the namespace you want to connect to, as well as some values for polling. Do note that there is no connection information in this configuration. Connection will happen based on the user assigned managed identity. Getting the token from AAD is all provided by the spring-cloud-azure-stream-binder-eventhubs
library.
-
Commit these changes to the config repo.
git add . git commit -m 'added event hub supply' git push
-
Rebuild the customers service.
mvn clean package -DskipTests -rf :spring-petclinic-customers-service
-
Rebuild the container image and push it to the container registry.
cd staging-acr cp ../spring-petclinic-customers-service/target/spring-petclinic-customers-service-$VERSION.jar spring-petclinic-customers-service-$VERSION.jar docker build -t $MYACR.azurecr.io/spring-petclinic-customers-service:$VERSION \ --build-arg ARTIFACT_NAME=spring-petclinic-customers-service-$VERSION.jar \ --build-arg APP_PORT=8080 \ --build-arg AI_JAR=ai.jar \ . docker push $MYACR.azurecr.io/spring-petclinic-customers-service:$VERSION
-
Delete the current running customers service pod and double check that it starts running again.
kubectl get pods kubectl delete pod <customers-service-pod> kubectl get pods -w kubectl logs <customers-service-pod> -f
You should see in the logs messages related to the connection to the event hub.
In case you see errors during the startup of the customers service, double check the error message whether it provides info on why it’s not working as expected. Double check all the above steps. To redeploy the customers service: rebuild the jar file and copy it to the staging-acr directory, rebuild the container image and stop the previously running pod of the customers-service. Additionaly you may also delete the previous version of the config-server, before deleting the customers-service pod, so you are sure you are running with latest config. Also make sure the config-server starts running properly again as well.
-
Once you see the customers service properly running again without errors, you can start testing out sending some test events. Navigate in the running petclinic application to Owners - Register. In the new owner screen fill out the form and select Submit.
-
In the customers service log output, double check that you see messages indicating that events are being send to the event hub.
-
In the Azure portal, navigate to your resource group and select the event hub. On the overview screen graphs you should see a peak in requests and messages.
It might be that this peak in the graph is not immediately visible, wait for a minute and refresh the page.