Developing Microservices with CQRS and Event Sourcing Patterns using GraphQL + Spring Boot + Kafka

Shazin Sadakath
4 min readJul 16, 2022

--

Command and Query Responsibility Segregation, CQRS in short is a Software Development Pattern which separates read and write operations for a data store. This segregation provides following benefits:

  1. Improved performance
  2. High scalability
  3. High Security

It also provides better ability to evolve with more commands in the future and avoids merge conflicts at domain level which can occur due to update operations.

I came across this pattern way back in 2013–2014 when I was working with Spring Security ACL project. In that project Mutable and Immutable operations were segregated into separate models, service components within a single packaged module.

But since the explosion of Microservices Architecture in recent times this pattern has once again emerged to the forefront due to its aforementioned properties.

This pattern coupled with Event Sourcing pattern which stores the full series of read only actions done on data and provides the ability replay the events to materialise data to the current state fits perfectly well for demands of highly distributed, loosely coupled Microservices of today in order to audit, troubleshoot, etc.

Architecture

Diagram for User Domain Microservices with CQRS and Event Sourcing Patterns

The above diagram shows the architecture I have used to develop the Microservices to demonstrate CQRS and Event Sourcing patterns at work. For this I have used the User Domain where User Command Microservice handles Mutation Responsibility and User Query Microservice handles Query Responsibility. Both Microservices are connected via an Event Bus, which in this is a Apache Kafka topic.

The Microservices use GraphQL to provide interfacing between Client and Backend instead of traditional REST architecture.

User Command Microservice

This Microservice uses a GraphQL mutation with createUser and deleteUser operations.

input UserInput {    
firstName: String
lastName: String
dateOfBirth: Date
identityNumber: Int
}

type Mutation {
# restricted
createUser(user: UserInput!): User
deleteUser(id: ID!): ID
}

The User Command Microservice uses an HSQLDB embedded database to store the User Commands for auditing and troubleshooting purposes before publishing to Event Bus in the form of a UserCreateEvent or UserDeleteEvent.

public record UserCreateEvent(User user) implements UserEvent {

@Override
public String getType() {
return "CREATE";
}
}
public record UserDeleteEvent(String id) implements UserEvent {

@Override
public String getType() {
return "DELETE";
}
}

DefaultUserEventHandler publishes event before saving them to the embedded database.

@Component
public class DefaultUserEventHandler implements UserEventHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUserEventHandler.class);

private final KafkaTemplate<String, String> kafkaTemplate;
private final String kafkaTopicName;
private final ObjectMapper objectMapper;
private final UserCommandRepository userCommandRepository;


public DefaultUserEventHandler(KafkaTemplate<String, String> kafkaTemplate, @Value("${kafka.topic.name}") String kafkaTopicName, ObjectMapper objectMapper, UserCommandRepository userCommandRepository) {
this.kafkaTemplate = kafkaTemplate;
this.kafkaTopicName = kafkaTopicName;
this.objectMapper = objectMapper;
this.userCommandRepository = userCommandRepository;
}

public boolean publishUserEvent(UserEvent event) {
try {
String payload = objectMapper.writeValueAsString(event);
this.kafkaTemplate.send(kafkaTopicName, payload);
this.userCommandRepository.save(new UserCommand(UUID.randomUUID().toString(), payload, event.createdDate(), event.createdBy(), event.getType()));
return true;
} catch (Exception e) {
LOGGER.error("Error while publishing user", e);
return false;
}
}


}

This microservice will be running on port 8081 in the demonstration.

User Create Command
User Delete Command

User Query Microservice

This microservice listens to events coming via the event bus and mutates the MySQL database using the event type (Create, Delete).

@Component
public class UserEventListener {

private final static Logger LOGGER = LoggerFactory.getLogger(UserEventListener.class);

private final ObjectMapper objectMapper;
private final UserRepository userRepository;

public UserEventListener(ObjectMapper objectMapper, UserRepository userRepository) {
this.objectMapper = objectMapper;
this.userRepository = userRepository;
}

@KafkaListener(topics = "${kafka.topic.name}")
public void consume(String message) {
try {
Map<String, String> userEvent = objectMapper.readValue(message, Map.class);
if (userEvent.get("type").equals("CREATE")) {
UserCreateEvent userCreateEvent = objectMapper.readValue(message, UserCreateEvent.class);
User user = userCreateEvent.user();
userRepository.save(new UserEntity(UUID.randomUUID().toString(), user.firstName(), user.lastName(), user.dateOfBirth(), user.identityNumber()));
} else if (userEvent.get("type").equals("DELETE")) {
UserDeleteEvent userDeleteEvent = objectMapper.readValue(message, UserDeleteEvent.class);
userRepository.deleteById(userDeleteEvent.id());
}
} catch (Exception e) {
LOGGER.error("Error while handling message", e);
}
}
}

Also it exposes a GraphQL query allUsers to retrieve persisted Users from the database.

type User {
id: ID!
firstName: String
lastName: String
dateOfBirth: Date
identityNumber: Int
}

type Query {
allUsers: [User]
findOne(id: ID!): User
}

This microservice will be running on port 8080 in the demonstration.

User Query Command

Finally everything is containerised using Docker and glued together using Docker compose.

Demonstration

The source code is available in GitHub. Please clap this post and follow me for more exciting similar posts.

UPDATE

After writing this post and publishing it, I got some valuable feedback from a Principal Architect from France who mentioned some shortcomings of the approach I took. I am always open for constructive criticism and redid some parts by involving Debezium which is a Change Data Capture (CDC) tool and committed the changes to a new branch. The architecture of that approach looks as following.

Diagram for User Domain Microservices with CQRS and Event Sourcing Patterns with Debezium CDC

Now there are two User databases (userrdb, userwdb) for read and write. Read database is eventually consistent and everytime a record is inserted to or deleted from the user table in write database it is detected by Debezium and sent down via the Apache Kafka event bus. Rest remains the same.

--

--

Shazin Sadakath
Shazin Sadakath

Responses (4)