Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Update an existing microservice to send events to the Event Hub

You will now implement the functionality that will allow you to emulate sending events to the telemetry Event Hub. For this you will update the customers-service. Each time a new customer gets created, send a new event to the event hub, together with 100 new pet events (we are going to simulate some dummy load).

Step by step guidance

  1. In the customers-service pom.xml file, add the following extra dependency:

        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
        </dependency>
    
  2. Replace the contents of the src/main/java/org/springframework/samples/petclinic/customers/CustomersServiceApplication.java file with:

    package org.springframework.samples.petclinic.customers;
       
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
       
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.Message;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
       
    /**
     * @author Maciej Szarlinski
     */
    @EnableDiscoveryClient
    @SpringBootApplication
    public class CustomersServiceApplication {
       
    	private static final Logger LOGGER = LoggerFactory.getLogger(CustomersServiceApplication.class);
       
    	public static void main(String[] args) {
    		SpringApplication.run(CustomersServiceApplication.class, args);
    	}
       
    	@ServiceActivator(inputChannel = "telemetry.errors")
        public void producerError(Message<?> message) {
            LOGGER.error("Handling Producer ERROR: " + message);
        }
    }   
    

    This adds an extra logger and a method that will be called in case of errors in the sending of events.

  3. Replace the contents of the src/spring-petclinic-customers-service/src/main/java/org/springframework/samples/petclinic/customers/web/OwnerResource.java file with:

    package org.springframework.samples.petclinic.customers.web;
       
    import io.micrometer.core.annotation.Timed;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.http.HttpStatus;
    import org.springframework.samples.petclinic.customers.model.Owner;
    import org.springframework.samples.petclinic.customers.model.OwnerRepository;
    import org.springframework.web.bind.annotation.*;
       
    import jakarta.validation.Valid;
    import jakarta.validation.constraints.Min;
    import java.util.List;
    import java.util.Optional;
       
    import reactor.core.publisher.Sinks;
    import reactor.core.publisher.Flux;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import java.util.function.Supplier;
       
    import org.springframework.beans.factory.annotation.Autowired;
       
    /**
     * @author Juergen Hoeller
     * @author Ken Krebs
     * @author Arjen Poutsma
     * @author Michael Isvy
     * @author Maciej Szarlinski
     */
    @RequestMapping("/owners")
    @RestController
    @Timed("petclinic.owner")
    @RequiredArgsConstructor
    @Slf4j
    class OwnerResource {
       
        private final OwnerRepository ownerRepository;
       
        @Autowired
        private Sinks.Many<Message<String>> many;
       
        private static final Logger LOGGER = LoggerFactory.getLogger(OwnerResource.class);
       
        /**
         * Create Owner
         */
        @PostMapping
        @ResponseStatus(HttpStatus.CREATED)
        public Owner createOwner(@Valid @RequestBody Owner owner) {
            LOGGER.info("+++Sending events+++");
            many.emitNext(MessageBuilder.withPayload("New owner created: " + owner.getFirstName() + " " + owner.getLastName() + " with many pets ...").build(), Sinks.   EmitFailureHandler.FAIL_FAST);
            for(int i = 0; i < 100; i++) {
                many.emitNext(MessageBuilder.withPayload("Pet " + i).build(), Sinks.EmitFailureHandler.FAIL_FAST);
            }
       
            return ownerRepository.save(owner);
        }
       
        /**
         * Read single Owner
         */
        @GetMapping(value = "/{ownerId}")
        public Optional<Owner> findOwner(@PathVariable("ownerId") @Min(1) int ownerId) {
            return ownerRepository.findById(ownerId);
        }
       
        /**
         * Read List of Owners
         */
        @GetMapping
        public List<Owner> findAll() {
            return ownerRepository.findAll();
        }
       
        /**
         * Update Owner
         */
        @PutMapping(value = "/{ownerId}")
        @ResponseStatus(HttpStatus.NO_CONTENT)
        public void updateOwner(@PathVariable("ownerId") @Min(1) int ownerId, @Valid @RequestBody Owner ownerRequest) {
            final Optional<Owner> owner = ownerRepository.findById(ownerId);
            final Owner ownerModel = owner.orElseThrow(() -> new ResourceNotFoundException("Owner "+ownerId+" not found"));
       
            // This is done by hand for simplicity purpose. In a real life use-case we should consider using MapStruct.
            ownerModel.setFirstName(ownerRequest.getFirstName());
            ownerModel.setLastName(ownerRequest.getLastName());
            ownerModel.setCity(ownerRequest.getCity());
            ownerModel.setAddress(ownerRequest.getAddress());
            ownerModel.setTelephone(ownerRequest.getTelephone());
            log.info("Saving owner {}", ownerModel);
            ownerRepository.save(ownerModel);
        }
    }
    

    This adds an additional sync, that is used in the createOwner method to add events to.

  4. In the /src/spring-petclinic-customers-service/src/main/java/org/springframework/samples/petclinic/customers/config/ folder, add an extra ManualProducerConfiguration.java file with the below contents:

    package org.springframework.samples.petclinic.customers.config;
       
       
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import com.azure.spring.messaging.AzureHeaders;
    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Profile;
    import org.springframework.messaging.Message;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
       
    import java.util.function.Consumer;
    import java.util.function.Supplier;
       
    @Configuration
    public class ManualProducerConfiguration {
       
        private static final Logger LOGGER = LoggerFactory.getLogger(ManualProducerConfiguration.class);
       
        @Bean
        public Sinks.Many<Message<String>> many() {
            return Sinks.many().unicast().onBackpressureBuffer();
        }
       
        @Bean
        public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many) {
            return () -> many.asFlux()
                             .doOnNext(m -> LOGGER.info("Manually sending message {}", m))
                             .doOnError(t -> LOGGER.error("Error encountered", t));
        }
    }   
    

    The ManualProducerConfiguration uses the sync to send the events to the event hub.

  5. Update the /src/spring-petclinic-customers-service/src/main/resources/application.yml file, and replace its contents with:

    ```yaml spring: application: name: customers-service config: import: optional:configserver:${CONFIG_SERVER_URL:http://localhost:8888/} cloud: function: supply;


spring: config: activate: on-profile: docker import: configserver:http://config-server:8888


   This adds an extra spring cloud function for the supply method in the `ManualProducerConfiguration` class.

1. Save the changes to all 4 files.

2. In the config repository, add the `spring.cloud.stream` configuration for sending messages to the event hub. Add this section right under the spring statement on line 10. Make sure to change the `<your-event-hub-namespace>` with the name of the namespace you created.

   ```yaml
     cloud:  
       stream: 
         bindings:
           supply-out-0:
             destination: telemetry
         binders:
           eventhub:
             type: eventhubs
             default-candidate: true
             environment:
               spring:
                 cloud:
                   azure:
                     eventhubs:
                       namespace: <your-event-hub-namespace>
         poller:
           initial-delay: 0
           fixed-delay: 1000       

This configures the output stream for supply-out-0 to use the telemetry endpoint of the event hub. It also indicates the namespace you want to connect to, as well as some values for polling. Do note that there is no connection information in this configuration. Connection will happen based on the user assigned managed identity. Getting the token from AAD is all provided by the spring-cloud-azure-stream-binder-eventhubs library.

  1. Commit these changes to the config repo.

    git add .
    git commit -m 'added event hub supply'
    git push   
    
  2. Rebuild the customers service.

    mvn clean package -DskipTests -rf :spring-petclinic-customers-service
    
  3. Rebuild the container image and push it to the container registry.

    cd staging-acr
    cp ../spring-petclinic-customers-service/target/spring-petclinic-customers-service-$VERSION.jar spring-petclinic-customers-service-$VERSION.jar
       
    docker build -t $MYACR.azurecr.io/spring-petclinic-customers-service:$VERSION \
        --build-arg ARTIFACT_NAME=spring-petclinic-customers-service-$VERSION.jar \
        --build-arg APP_PORT=8080 \
        --build-arg AI_JAR=ai.jar \
        .
          
    docker push $MYACR.azurecr.io/spring-petclinic-customers-service:$VERSION
    
  4. Delete the current running customers service pod and double check that it starts running again.

    kubectl get pods
    kubectl delete pod <customers-service-pod>
       
    kubectl get pods -w
       
    kubectl logs <customers-service-pod> -f
    

    You should see in the logs messages related to the connection to the event hub.

    In case you see errors during the startup of the customers service, double check the error message whether it provides info on why it’s not working as expected. Double check all the above steps. To redeploy the customers service: rebuild the jar file and copy it to the staging-acr directory, rebuild the container image and stop the previously running pod of the customers-service. Additionaly you may also delete the previous version of the config-server, before deleting the customers-service pod, so you are sure you are running with latest config. Also make sure the config-server starts running properly again as well.

  5. Once you see the customers service properly running again without errors, you can start testing out sending some test events. Navigate in the running petclinic application to Owners - Register. In the new owner screen fill out the form and select Submit.

  6. In the customers service log output, double check that you see messages indicating that events are being send to the event hub.

  7. In the Azure portal, navigate to your resource group and select the event hub. On the overview screen graphs you should see a peak in requests and messages.

    It might be that this peak in the graph is not immediately visible, wait for a minute and refresh the page.