Event-Driven Application using AWS EventBridge, Lambda and S3 in Java — Part 2
You must have heard about AWS S3 which is used almost in every company either to host a website, static content or as File Storage. In this, we will go through a use case for which we will also design a solution.
Problem Statement
You are working in Pillow Software Limited and you provide a multi-tenant solution where Customers can view the stats of their networks how devices are performing. Recently you acquired a company Sheet Software Limited that provides an application which also collects also provides relevant data but it only provides data in form of CSV Files. So if you want data you need to schedule a job to get the CSV file and the CSV file will be generated on a system where application is running.
Ask
You need to design a system where the Customer can subscribe and he can receive the same data over HTTP.
Data Flow
- Job Scheduler that will continuously submit job to Sheet Application
- Sheet Application will receive Job Export Request and generate CSV Files
- Light weight Agent that will read all generated files and send them to Amazon S3 and delete from local to avoid disk space issues
- Amazon Lambda that will read S3 Event
- Amazon Lambda will internally have S3 Client that will get S3 Object in for of InputStream on the basis of Bucket and Key
- Amazon Lambda will read all lines from Input Steam and create payload with the help of Columns and values of each row
- Create Event Bridge Client and send Payload in Event Bridge to Event Bus
- Create Rule on Event Bridge and integrate API End point to send data further
Desgin Diagram
Components
Job Scheduler
This will schedule the job at specific interval and on the basis of available parameters submit all job to Sheet Application via API.
Light Weight Agent
Light weight Agent will check if Job export is completed and if yes it will read the CSV file present in Job Export Response and check if file is empty if it is not it will be uploaded to S3 and same will be deleted on local.
Amazon S3
A bucket will be created where all files will be uploaded.
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
public class S3Service {
private AmazonS3 amazonS3 = null;
public S3Service() {
this.amazonS3 = AmazonS3ClientBuilder.defaultClient();
}
public S3Object getObject(GetObjectRequest getObjectRequest) {
return amazonS3.getObject(getObjectRequest);
}
}
Amazon Lambda
This will read S3 event and on the basis of params will read object in form of Input Stream and send data to Event bus using Event Bridge Client.
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.lambda.runtime.events.models.s3.S3EventNotification;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.google.gson.Gson;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class S3EventConsumer {
Gson gson = new Gson();
EventBridgeService eventBridgeService = new EventBridgeService();
S3Service s3Service = new S3Service();
public Object handleRequest(S3Event input, Context context) throws IOException {
context.getLogger().log("Received S3 Event = " + input);
for (S3EventNotification.S3EventNotificationRecord record : input.getRecords()) {
String s3Key = record.getS3().getObject().getKey();
String s3Bucket = record.getS3().getBucket().getName();
context.getLogger().log("found id: " + s3Bucket + " " + s3Key);
// retrieve s3 object
try {
S3Object object = s3Service.getObject(new GetObjectRequest(s3Bucket, s3Key));
InputStream objectData = object.getObjectContent();
List<String> lines = IOUtils.readLines(objectData, Charset.defaultCharset());
context.getLogger().log("Total Lines " + lines.size());
List<String> columns = new ArrayList<>();
List<Map<String, String>> allEntries = new ArrayList<>();
for (String line : lines) {
if (line.indexOf(",") == -1) {
continue;
} else if (columns.size() == 0) {
columns = List.of(line.split(","));
} else {
Map<String, String> entry = new LinkedHashMap<>();
int count = 0;
String values[] = line.split(",");
for (String column : columns) {
entry.put(column, values[count].replace("\"", ""));
count++;
}
allEntries.add(entry);
}
}
allEntries.stream().forEach(entry -> {
eventBridgeService.putEventsToBridge("nten_confluent_lambda_event_bus", gson.toJson(entry), "app.demo.sheet", "sqmediator", context);
});
context.getLogger().log("s3Key: " + s3Key + " s3Bucket: " + s3Bucket + " Pushed Entries = " + allEntries.size());
} catch(Exception e) {
context.getLogger().log("s3Key: " + s3Key + " s3Bucket: " + s3Bucket + " Exception = " + e);
}
}
return "success";
}
}
AWS Lambda S3 Trigger
Below trigger will be invoked everytime Object is created in S3. Using Prefix and suffix we can filter the Objects Created.
Event Bridge
On event bridge we can create rules that will filter data from event bus accordingly and send that to API that will be configured in Target.
import com.amazonaws.regions.Regions;
import com.amazonaws.services.eventbridge.AmazonEventBridge;
import com.amazonaws.services.eventbridge.AmazonEventBridgeClient;
import com.amazonaws.services.eventbridge.model.PutEventsRequest;
import com.amazonaws.services.eventbridge.model.PutEventsRequestEntry;
import com.amazonaws.services.eventbridge.model.PutEventsResult;
import com.amazonaws.services.lambda.runtime.Context;
import java.nio.charset.StandardCharsets;
import java.util.Calendar;
public class EventBridgeService {
private AmazonEventBridge amazonEventBridge = null;
public EventBridgeService() {
this.amazonEventBridge = AmazonEventBridgeClient.builder()
.withRegion(Regions.US_EAST_1)
.build();
}
public void putEventsToBridge(String eventBusName, String event, String source, String detailsType, Context context) {
PutEventsRequestEntry putEventsRequestEntry = new PutEventsRequestEntry();
putEventsRequestEntry = putEventsRequestEntry.withSource(source)
.withEventBusName(eventBusName)
.withTime(Calendar.getInstance().getTime())
.withDetailType(detailsType)
.withDetail(event);
putEventsRequestEntry.setDetailType(detailsType);
PutEventsRequest putEventsRequest = new PutEventsRequest();
putEventsRequest.withEntries(putEventsRequestEntry);
int sizeInBytes = getSize(putEventsRequestEntry);
context.getLogger().log("source => "+ source +" :: Event Bridge Request Size => " + sizeInBytes);
if(sizeInBytes > (300*1024)) {
throw new RuntimeException("Size exceeds than 256KB. Not sending event..");
}
PutEventsResult putEventsResult = amazonEventBridge.putEvents(putEventsRequest);
context.getLogger().log("Event Bridge Response = " + putEventsResult.getFailedEntryCount());
}
//https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-putevent-size.html
private int getSize(PutEventsRequestEntry entry) {
int size = 0;
if (entry.getTime() != null) {
size += 14;
}
size += entry.getSource().getBytes(StandardCharsets.UTF_8).length;
size += entry.getDetailType().getBytes(StandardCharsets.UTF_8).length;
if (entry.getDetail() != null) {
size += entry.getDetail().getBytes(StandardCharsets.UTF_8).length;
}
if (entry.getResources() != null) {
for (String resource : entry.getResources()) {
if (resource != null) {
size += resource.getBytes(StandardCharsets.UTF_8).length;
}
}
}
return size;
}
}
Using above we can send events to Customer API easily and solution ican be scaled easily.