Data Pipeline using AWS Event Bridge, SQS and Lambda

Raghunandan Gupta
5 min readApr 9, 2022

Data is crucial information for everyone. Nowadays data is used in every possible way to provide you with personalized suggestions or influence your thoughts. This is also one of the reasons nowadays Customer also wants to make sure their data is secure and encrypted so their competitor doesn’t take advantage of that. There are many analytics companies that not only collect but also aggregate and provide different data view to end Customer.

The challenge which companies face is how the customer wants to view data. If a company is providing an enterprise application, in that case, all customers will receive a similar view of data. There are customers who want to apply their analytics on top of the data or merge it with confidential data to get more insights.

Data Pipeline is the heart of every data company. Creating a pipeline is not an easy process. It requires generic connectors which can be extended at any time to convert unstructured data into structured data that the pipeline can understand. Filtering, enrichment, and removal of erroneous data are one of the core modules which make pipelines more valuable.

When I was working for one of the Clients I was given a Customer requirement where the Customer was interested in certain types of events.

  • Customers should be able to create rules to filter specific data
  • Customer data should not be shared with others
  • Customers should be easily integrated with the data stream

In most cases, these kinds of requirements are always external and applicable to a few customers which are big.

To handle such cases it’s always good to have clear requirements to avoid re-work and have a better understanding of the outcome.

We discussed the requirement with the Customer.

  • Customers will be able to subscribe using webhook with any existing Authentication Method like Basic, API Key, or OAuth
  • The customer was also interested in rotating his API keys
  • In case the Customer’s API Gateway is down we should retry for the same and shouldn’t drop the event
  • Customers can subscribe to only specific events

After a lot of exploration about data pipelines, we decided to perform POC on AWS and we came to know that by using AWS Event Bridge we can build a robust data pipeline that not only provides integration with other AWS-managed services and can easily fulfill customer requirements and requires less effort.

Onboarding

This is the first step where we need to create an Onboarding Process and a UI portal using which Customers can subscribe to specific events and configure webhooks, API key rotation, retry count and intervals, etc.

We created a flow chart for onboarding and all details were saved in a database in form of a predefined pattern which could be used in case of disaster recovery or recreate onboarding in case we need to move it to a different region.

Event Bridge / Event Bus

  • Amazon EventBridge is a serverless event bus that ingests data from your own apps, SaaS apps, and AWS services and routes that data to targets.

Amazon EventBridge rules

  • A rule matches incoming events and sends them to targets for processing. A single rule can send an event to multiple targets, which then run in parallel.

AWS Lambda

  • AWS Lambda is a serverless, event-driven compute service that lets you run code for virtually any type of application or backend service without provisioning or managing servers. You can trigger Lambda from over 200 AWS services and software as a service (SaaS) applications and only pay for what you use.

API destinations

  • Amazon EventBridge API destinations are HTTP endpoints that you can invoke as the target of a rule, similar to how you invoke an AWS service or resource as a target.

Data Flow

We already were storing data in a database that too in a structured format and the customer was interested in the same so we integrated events with AWS Lambda where every event was sent to AWS lambda and in the same lambda, we configured where the event should go next on the basis of the tenant.

Common AWS Tags

Tags provide metadata information which ideally includes

  • Created By
  • Created Date
  • Team
  • Owner
  • Product
  • Environment
private static List<Tag> getTags(String tenant) {
Tag tag1 = new Tag();
tag1.setKey("tenant");
tag1.setValue(tenant);

Tag tag2 = new Tag();
tag2.setKey("CreatedBy");
tag2.setValue("Provisioner_Programmatically");

Tag tag3 = new Tag();
tag3.setKey("timestamp");
tag3.setValue(String.valueOf(Instant.now().toEpochMilli()));

return Arrays.asList(tag1, tag2, tag3);
}

Create Event Bus

private static Optional<Tuple2<String,String>> createEventBus(String tenant) {
try {

CreateEventBusRequest createEventBusRequest = new CreateEventBusRequest();

//Tags are required to identify who and when resource was created
createEventBusRequest.setTags(getTags(tenant));

createEventBusRequest.setName("event_bus_" + tenant);
CreateEventBusResult createEventBusResult = amazonEventBridge.createEventBus(createEventBusRequest);
LOGGER.info("Event Bus created for tenant={} bus={}", tenant, createEventBusResult.getEventBusArn());
return Optional.of(new Tuple2(createEventBusRequest.getName(), createEventBusResult.getEventBusArn()));
} catch(Exception ex) {
LOGGER.error("Exception occurred while creating event bus for tenant={}", tenant, ex);
throw new AWSEventBridgeException(ex.getMessage());
}
}

Update Lambda Envrionment Variables

public static void updateLambdaEnvVariabels(Map<String,String> newEnvParams) {
Optional<FunctionConfiguration> functionConfigurationOptional = findAWSLambdaFunctionConfig("lambda-postgresEventBridgePublisher-ujritvn8gYe6");
if(functionConfigurationOptional.isPresent()) {
FunctionConfiguration functionConfiguration = functionConfigurationOptional.get();
updateLambdaEnvironmentVariables(functionConfiguration.getFunctionName(), functionConfiguration.getEnvironment().getVariables(), newEnvParams);
}
}

Create API Destination

private static Target constructAPIDestinationTarget(String description, String apiDestinationName, String endpoint, Integer invocationRate) {

CreateConnectionApiKeyAuthRequestParameters createConnectionApiKeyAuthRequestParameters = new CreateConnectionApiKeyAuthRequestParameters();
createConnectionApiKeyAuthRequestParameters.setApiKeyName("x-api-key");
createConnectionApiKeyAuthRequestParameters.setApiKeyValue(apiDestinationName+"-"+Instant.now().toEpochMilli());

CreateConnectionRequest createConnectionRequest = new CreateConnectionRequest()
.withAuthorizationType(ConnectionAuthorizationType.API_KEY)
.withAuthParameters(new CreateConnectionAuthRequestParameters().withApiKeyAuthParameters(createConnectionApiKeyAuthRequestParameters))
.withDescription(description)
.withName(apiDestinationName);

CreateConnectionResult createConnectionResult = amazonEventBridge.createConnection(createConnectionRequest);
LOGGER.info("Created API Connection = {}", createConnectionResult.getConnectionArn());

CreateApiDestinationRequest createApiDestinationRequest = new CreateApiDestinationRequest()
.withInvocationEndpoint(endpoint)
.withConnectionArn(createConnectionResult.getConnectionArn())
.withHttpMethod(HttpMethod.POST.name())
.withName(apiDestinationName)
.withDescription(description).withInvocationRateLimitPerSecond(invocationRate);
CreateApiDestinationResult createApiDestinationResult = amazonEventBridge.createApiDestination(createApiDestinationRequest);
LOGGER.info("Created API Destination = {}", createApiDestinationResult.getApiDestinationArn());


Target target = new Target().withArn(createApiDestinationResult.getApiDestinationArn()).withId(Instant.now().getEpochSecond()+"").withRoleArn("arn:aws:iam::594459988176:role/service-role/Amazon_EventBridge_Invoke_Api_Destination_xxxxx");

return target;
}

Create Event Rule in Event Bus

createRule("tenant10", eventBusName, ruleName,"{\n" +
" \"source\": [\"app.demo."+tenant+"\"],\n" +
" \"detail\": {\n" +
" \"eventtype\": [\"ORDERED\"]\n" +
" }\n" +
"}");
private static Optional<String> addTargetWithEventBusAndRule(String eventBus, String ruleName, List<Target> targets) {
try {
PutTargetsRequest putTargetsRequest = new PutTargetsRequest()
.withEventBusName(eventBus)
.withRule(ruleName)
.withTargets(targets);
PutTargetsResult putTargetsResult = amazonEventBridge.putTargets(putTargetsRequest);
LOGGER.info("Added new targets to Eventbus={} Event Rule={} Response={}", eventBus, ruleName, putTargetsResult.getSdkHttpMetadata());
return Optional.of("OK");
} catch (Exception ex) {
LOGGER.error("Exception occurred while creating target for rule={} for event bus={}", ruleName, eventBus, ex);
throw new AWSEventBridgeException(ex.getMessage());
}
}

Once the above setup was complete, we were able to receive events on the basis of Customer rules on HTTP webhooks.

References

--

--

Raghunandan Gupta

I am a coder by heart and try to learn new things along with sharpening existing skills. https://www.linkedin.com/in/raghunitb/