Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Update an existing microservice to receive events from the Event Hub

In this task, you will update the vets microservice to receive events from the telemetry event hub. You can use the following guidance to implement these changes:

Step by step guidance

  1. Reading the messages from the event hub, also entails that you will need to checkpoint what part of the stream you already read and processed. For keeping track of the checkpoints you will use a storage account. Create the storage account and a container with the below steps:
   STORAGE_ACCOUNT_NAME=stg$APPNAME$UNIQUEID
   echo $STORAGE_ACCOUNT_NAME
   az storage account create --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --location $LOCATION --sku "Standard_LRS" 
   az storage account show --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --query id -o tsv
   STORAGE_ACCOUNT_ID=$(az storage account show --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --query id -o tsv)
   echo $STORAGE_ACCOUNT_ID

   STORAGE_CONTAINER=eventhubs-binder
   az storage container create --name $STORAGE_CONTAINER --account-name $STORAGE_ACCOUNT_NAME --public-access container --auth-mode login
  1. The checkpointing will be done through AAD authentication with your user assigned managed identity. You will need to give the managed identity access to the storage container:
   az role assignment create --assignee $USER_ASSIGNED_CLIENT_ID --role 'Storage Account Contributor' --scope $STORAGE_ACCOUNT_ID
   az role assignment create --assignee $USER_ASSIGNED_CLIENT_ID --role 'Storage Blob Data Contributor' --scope $STORAGE_ACCOUNT_ID
   az role assignment create --assignee $USER_ASSIGNED_CLIENT_ID --role 'Storage Blob Data Owner' --scope $STORAGE_ACCOUNT_ID/containers/$STORAGE_CONTAINER
  1. As a next step, in the config repository, replace the entire existing spring.cloud.stream configuration with the below configuration. Make sure to change the <your-event-hub-namespace> with the name of the namespace you created and the with the name of the storage account you created.

      cloud:  
        stream: 
          bindings:
            consume-in-0:
              destination: telemetry
              group: $Default
            supply-out-0:
              destination: telemetry
          binders:
            eventhub:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        namespace: <your-event-hub-namespace>
                        processor:
                          checkpoint-store:
                            container-name: eventhubs-binder
                            account-name: <your-storage-account-name>      
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL    
          poller:
            initial-delay: 0
            fixed-delay: 1000    
    

This adds to the configuration the parts for the consumer. It indicates that the consumer will use the $Default consumer group of the telemetry event hub. It also adds the storage account as a checkpoint store. And it also indicates the mode for checkpointing we will be using, which in this case is MANUAL mode.

  1. Now that the configuration is done, you will update the spring-petclinic-vets-service. In your local application repository, use your favorite text editor to open the pom.xml file of the spring-petclinic-vets-service microservice, add to it another dependency element within the <!-- Spring Cloud --> section of the <dependencies> element, and save the change:

     <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
     </dependency>  
    
  2. In the spring-petclinic-microservices/spring-petclinic-vets-service/src/main/java/org/springframework/samples/petclinic/vets folder, update the VetsServiceApplication.java file with the below code:

    package org.springframework.samples.petclinic.vets;
       
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
    import org.springframework.samples.petclinic.vets.system.VetsProperties;
       
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.Message;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
       
    /**
     * @author Maciej Szarlinski
     */
    @EnableDiscoveryClient
    @SpringBootApplication
    @EnableConfigurationProperties(VetsProperties.class)
    public class VetsServiceApplication {
       
    	private static final Logger LOGGER = LoggerFactory.getLogger(VetsServiceApplication.class);
       
    	public static void main(String[] args) {
    		SpringApplication.run(VetsServiceApplication.class, args);
    	}
       
    	@ServiceActivator(inputChannel = "telemetry.$Default.errors")
        public void consumerError(Message<?> message) {
            LOGGER.error("Handling consumer ERROR: " + message);
        }
    }
    

    This adds a consumeError method to this class, which will be called in case of errors with the connection to your event hub.

  3. In the spring-petclinic-microservices/spring-petclinic-vets-service/src/main/java/org/springframework/samples/petclinic/vets folder, add a new services folder and create a EventHubListener.java in this folder with the below contents.

    package org.springframework.samples.petclinic.vets.services;
       
       
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Profile;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.samples.petclinic.vets.VetsServiceApplication;
    import org.springframework.stereotype.Service;
       
    import java.util.function.Consumer;
       
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
       
    @Configuration
    public class EventHubListener {
       
        private static final Logger LOGGER = LoggerFactory.getLogger(VetsServiceApplication.class);
       
        private int i = 0;
       
        @Bean
        public Consumer<Message<String>> consume() {
            return message -> {
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload(),
                        message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                        message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                        message.getHeaders().get(EventHubsHeaders.OFFSET),
                        message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
                );
       
                checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
            };
        }
    }   
    

This class has a consume method for consuming messages from the event hub. It uses a checkpointer to indicate which messages in the stream already got processed. With the logger we write out the message that got received.

  1. In the spring-petclinic-microservices/spring-petclinic-vets-service/src/main/resources folder, update the contents of the application.yml file with the below contents:
   spring:
     application:
       name: vets-service
     config:
       import: optional:configserver:${CONFIG_SERVER_URL:http://localhost:8888/}
     cache:
       cache-names: vets
     profiles:
       active: production
     cloud:
       function: consume;
       
   ---
   spring:
     config:
       activate:
         on-profile: docker
       import: configserver:http://config-server:8888  

This adds the configuration for the cloud function of the consume method.

  1. Make sure you saved all the files you just changed. In the Git Bash window, navigate back to the root folder of the spring petclinic repository and rebuild the vets microservice.

    cd ~/workspaces/java-microservices-aks-lab/src
    mvn clean package -DskipTests -rf :spring-petclinic-vets-service
    
  2. Navigate to the staging-acr directory, copy the jar file of the vets-service and rebuild the container.

    cd staging-acr
    rm spring-petclinic-vets-service-$VERSION.jar
       
    cp ../spring-petclinic-vets-service/target/spring-petclinic-vets-service-$VERSION.jar spring-petclinic-vets-service-$VERSION.jar
    docker build -t $MYACR.azurecr.io/spring-petclinic-vets-service:$VERSION \
        --build-arg ARTIFACT_NAME=spring-petclinic-vets-service-$VERSION.jar \
        --build-arg APP_PORT=8080 \
        --build-arg AI_JAR=ai.jar \
        .
       
    docker push $MYACR.azurecr.io/spring-petclinic-vets-service:$VERSION
    
  3. Delete the current running vets service pod and double check that it starts running again.

    kubectl get pods
    kubectl delete pod <vets-service-pod>
       
    kubectl get pods -w
       
    kubectl logs <vets-service-pod> -f
    

    You should see in the logs messages related to the connection to the event hub.

    In case you see errors during the startup of the vets service, double check the error message whether it provides info on why it’s not working as expected. Double check all the above steps. To redeploy the vets service: rebuild the jar file and copy it to the staging-acr directory, rebuild the container image and stop the previously running pod of the vets-service. Additionaly you may also delete the previous version of the config-server, before deleting the vets-service pod, so you are sure you are running with latest config. Also make sure the config-server starts running properly again as well.