Update an existing microservice to receive events from the Event Hub
In this task, you will update the vets microservice to receive events from the telemetry event hub. You can use the following guidance to implement these changes:
- Sending and Receiving Message by Azure Event Hubs and Spring Cloud Stream Binder Eventhubs in Spring Boot Application
- Spring Cloud Stream with Azure Event Hubs
- Use Java to send events to or receive events from Azure Event Hubs
Step by step guidance
- Reading the messages from the event hub, also entails that you will need to checkpoint what part of the stream you already read and processed. For keeping track of the checkpoints you will use a storage account. Create the storage account and a container with the below steps:
STORAGE_ACCOUNT_NAME=stg$APPNAME$UNIQUEID
echo $STORAGE_ACCOUNT_NAME
az storage account create --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --location $LOCATION --sku "Standard_LRS"
az storage account show --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --query id -o tsv
STORAGE_ACCOUNT_ID=$(az storage account show --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --query id -o tsv)
echo $STORAGE_ACCOUNT_ID
STORAGE_CONTAINER=eventhubs-binder
az storage container create --name $STORAGE_CONTAINER --account-name $STORAGE_ACCOUNT_NAME --public-access container --auth-mode login
- The checkpointing will be done through AAD authentication with your user assigned managed identity. You will need to give the managed identity access to the storage container:
az role assignment create --assignee $USER_ASSIGNED_CLIENT_ID --role 'Storage Account Contributor' --scope $STORAGE_ACCOUNT_ID
az role assignment create --assignee $USER_ASSIGNED_CLIENT_ID --role 'Storage Blob Data Contributor' --scope $STORAGE_ACCOUNT_ID
az role assignment create --assignee $USER_ASSIGNED_CLIENT_ID --role 'Storage Blob Data Owner' --scope $STORAGE_ACCOUNT_ID/containers/$STORAGE_CONTAINER
-
As a next step, in the config repository, replace the entire existing spring.cloud.stream configuration with the below configuration. Make sure to change the
<your-event-hub-namespace>
with the name of the namespace you created and thewith the name of the storage account you created. cloud: stream: bindings: consume-in-0: destination: telemetry group: $Default supply-out-0: destination: telemetry binders: eventhub: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: namespace: <your-event-hub-namespace> processor: checkpoint-store: container-name: eventhubs-binder account-name: <your-storage-account-name> eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
This adds to the configuration the parts for the consumer. It indicates that the consumer will use the $Default
consumer group of the telemetry
event hub. It also adds the storage account as a checkpoint store. And it also indicates the mode for checkpointing we will be using, which in this case is MANUAL
mode.
-
Now that the configuration is done, you will update the
spring-petclinic-vets-service
. In your local application repository, use your favorite text editor to open thepom.xml
file of thespring-petclinic-vets-service
microservice, add to it another dependency element within the<!-- Spring Cloud -->
section of the<dependencies>
element, and save the change:<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId> </dependency>
-
In the
spring-petclinic-microservices/spring-petclinic-vets-service/src/main/java/org/springframework/samples/petclinic/vets
folder, update theVetsServiceApplication.java
file with the below code:package org.springframework.samples.petclinic.vets; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.samples.petclinic.vets.system.VetsProperties; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Maciej Szarlinski */ @EnableDiscoveryClient @SpringBootApplication @EnableConfigurationProperties(VetsProperties.class) public class VetsServiceApplication { private static final Logger LOGGER = LoggerFactory.getLogger(VetsServiceApplication.class); public static void main(String[] args) { SpringApplication.run(VetsServiceApplication.class, args); } @ServiceActivator(inputChannel = "telemetry.$Default.errors") public void consumerError(Message<?> message) { LOGGER.error("Handling consumer ERROR: " + message); } }
This adds a
consumeError
method to this class, which will be called in case of errors with the connection to your event hub. -
In the
spring-petclinic-microservices/spring-petclinic-vets-service/src/main/java/org/springframework/samples/petclinic/vets
folder, add a newservices
folder and create aEventHubListener.java
in this folder with the below contents.package org.springframework.samples.petclinic.vets.services; import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders; import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.samples.petclinic.vets.VetsServiceApplication; import org.springframework.stereotype.Service; import java.util.function.Consumer; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @Configuration public class EventHubListener { private static final Logger LOGGER = LoggerFactory.getLogger(VetsServiceApplication.class); private int i = 0; @Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } }
This class has a consume
method for consuming messages from the event hub. It uses a checkpointer to indicate which messages in the stream already got processed. With the logger we write out the message that got received.
- In the
spring-petclinic-microservices/spring-petclinic-vets-service/src/main/resources
folder, update the contents of theapplication.yml
file with the below contents:
spring:
application:
name: vets-service
config:
import: optional:configserver:${CONFIG_SERVER_URL:http://localhost:8888/}
cache:
cache-names: vets
profiles:
active: production
cloud:
function: consume;
---
spring:
config:
activate:
on-profile: docker
import: configserver:http://config-server:8888
This adds the configuration for the cloud function of the consume method.
-
Make sure you saved all the files you just changed. In the Git Bash window, navigate back to the root folder of the spring petclinic repository and rebuild the vets microservice.
cd ~/workspaces/java-microservices-aks-lab/src mvn clean package -DskipTests -rf :spring-petclinic-vets-service
-
Navigate to the
staging-acr
directory, copy the jar file of the vets-service and rebuild the container.cd staging-acr rm spring-petclinic-vets-service-$VERSION.jar cp ../spring-petclinic-vets-service/target/spring-petclinic-vets-service-$VERSION.jar spring-petclinic-vets-service-$VERSION.jar docker build -t $MYACR.azurecr.io/spring-petclinic-vets-service:$VERSION \ --build-arg ARTIFACT_NAME=spring-petclinic-vets-service-$VERSION.jar \ --build-arg APP_PORT=8080 \ --build-arg AI_JAR=ai.jar \ . docker push $MYACR.azurecr.io/spring-petclinic-vets-service:$VERSION
-
Delete the current running vets service pod and double check that it starts running again.
kubectl get pods kubectl delete pod <vets-service-pod> kubectl get pods -w kubectl logs <vets-service-pod> -f
You should see in the logs messages related to the connection to the event hub.
In case you see errors during the startup of the vets service, double check the error message whether it provides info on why it’s not working as expected. Double check all the above steps. To redeploy the vets service: rebuild the jar file and copy it to the staging-acr directory, rebuild the container image and stop the previously running pod of the vets-service. Additionaly you may also delete the previous version of the config-server, before deleting the vets-service pod, so you are sure you are running with latest config. Also make sure the config-server starts running properly again as well.