In this article, I share an ETL pipeline that is simple to implement, scalable and provides near real-time analysis of application logs. The data set used is available here https://github.com/logpai/loghub. I have downloaded the Spark log files for this article.
If the log format is well known (e.g. Apache WebServer, Syslog) and in JSON then sending directly to Cloudwatch log group and adding an Elasticsearch publisher is a quick and handy solution. But if the logs are in custom format then it requires correct parsing and conversion to JSON first before ingesting to Elasticsearch. There are two common ways to achieve this.
a. Agent side – Every compute instances (current or new) running the application code must be also bundled with the conversion logic that will allow an agent reading log lines to know how to parse and convert to JSON before sending it to next stage. This strategy is beneficial when there are manageable number of agents extracting logs and you don’t expect modifications in conversion logic. A drawback is that each agent must be running same version of all types of conversion logic and it should be aware of the type of log lines to apply the correct transformation logic.
b. Server side – When you receive log lines (in raw format) from multiple agents to a centralized server, then before writing to storage, the conversion logic is applied. Benefit of this approach is to have one location where conversion logic resides and can be updated easily without redeploying or updating remote agents. This is easier to maintain and removes tight coupling between transformation logic and extraction logic. This is the approach I am sharing in this article.
This article is based on this AWS whitepaper, with focus on use case for
- Near real-time analysis
- Ingestion of custom log format
- Derive custom metric
The components used in this solution are managed by AWS and thus scale on-demand with no single point of failure.
Lambda function for transformation
First, create a Lambda function that will receive Spark log records (up to max buffer size) from Kinesis Data Firehose. This function will be invoked by the Kinesis and the response will be sent to the destination. Invoking Lambda on in-flight records serves multiple purposes such as transformation, compression, and filtering operation on bulk data in near real-time (min 60 seconds interval).
The transformation logic I am using is based on an existing lambda blueprint named “kinesis-firehose-process-record-python”. I have modified it to parse a custom log record and convert the output to JSON. The transformation code used in this article is available on Github.
Tip – Lambda function has a payload limit of 6 MB for both request and response. Ensure Kinesis Data Firehose buffering size for sending request to the function is less than or equal to 6 MB. Also ensure that the response that your function returns doesn’t exceed 6 MB.https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
Create Kinesis Data Firehose delivery stream
Once we have the lambda function in place the next step is to actually create the Firehose stream. I have used the “–cli-input-json” option to supply input params in a JSON file. These files are available on Github. Please replace it with values from your account before using it.
# create kinesis data firehose stream using inputs passed in json file (destination as S3) aws firehose create-delivery-stream --cli-input-json file://input-to-create-dest-s3.json
I recommend to first set S3 as destination and test the data ingestion and transformation using a cool feature in AWS console “Test with demo data”. Other quick option is to use “aws firehose” cli and execute put-record command.
After sending few test data, click on the Monitoring tab to verify if any errors were reported. The transformed record should have been uploaded to destination S3 bucket.
After testing the lambda transformation with test data, delete this delivery stream and the associated test data from S3.
Deleting Kinesis Firehose delivery stream will not delete the associated destination S3 bucket. It needs to be deleted manually.
Configuring Elasticsearch cluster
Now that we have tested our Lambda transformation and Kinesis Firehose using S3, let us create an Elasticsearch cluster to store actual logs data. As a best practice, we will only have logs for a certain time period in elastic (index rotation) and send the raw data into S3 for long-term storage and audit purposes. I strongly recommend creating a bucket lifecycle policy and move older logs to Glacier and/or delete them.
# create a Cloudwatch log group for publishing logs of new elasticsearch cluster aws logs create-log-group --log-group-name /aws/elasticsearch/es-twc # allow elasticsearch service to create log streams aws logs put-resource-policy --cli-input-json file://input-to-es-log-policy.json # create a Elasticsearch cluster aws es create-elasticsearch-domain --cli-input-json file://input-to-create-es.json # create kinesis data firehose stream using inputs passed in json file (destination as Elasticsearch with backup to S3) aws firehose create-delivery-stream --cli-input-json file://input-to-create-dest-es.json
The Elasticsearch cluster I created is a simple one for tutorial purposes only. It allows connection only from your IP Address. Do not use these settings for production or any such critical / proprietary environment.
Verifying the ETL pipeline
So till now, we have verified our Firehose to Elasticsearch pipeline using test data. Now, we will configure the pipeline with log data being sent by Kinesis Agent. The raw logs will go to the S3 bucket and the transformed logs will go to the Elasticsearch cluster with index rotation set to 1 month. To send the sample Spark logs, I have used this simple python function that reads the log file and executes put-record per line.
After sending some log records, launch Kibana and follow below steps
- Navigate to Kibana URL
- Go to “Stack Management”
- Click “Index Patterns” followed by “Create Index Pattern”
- Type the index name in the search bar (in this example it will be staginglogs-*)
- Kibana will show the correct index
- Click the index and follow the next steps as shown in Kibana UI to complete this process
After creating index, click “Discover” from main menu and you should see something like following.
- If the lambda function has an error, then Kinesis will not show any “incoming records” metrics in Cloudwatch. You must check the Lambda function Cloudwatch log groups for errors.
- The payload from Kinesis is of type byte after base64 decoding and not String. For any string operation, it must be converted to a string with encoding type (e.g. ‘utf-8’).
- When using the transformation function with Kinesis, it is good practice to enable S3 as a backup store for raw log records. This helps in troubleshooting and replaying logs.
- Create an Elasticsearch cluster with an index rotation period. This will allow you to quickly search and analyze logs that are hot and most relevant (e.g. Last 2 months), and also managing Elasticsearch cluster cost.
- For production and other important environments, always create Elasticsearch cluster in a private VPC without any external access.
Improving Lambda function to compute custom metric
Using a transformation function has an additional benefit if you want to filter a bunch of log lines and only interested in specific occurrences of certain keywords. When Kinesis Firehose sends data to Lambda, the response from Lambda is the data that goes to Elasticsearch. This approach is similar to a Window-based computation where Kinesis firehose buffers data (min 1 minute) allowing users to perform custom processing on bulk records. I have shared one such example on Github.
Remember to delete the following services used in this tutorial from your AWS console.
- Elasticsearch cluster
- Kinesis Firehose Delivery Stream
- Lambda Function
- S3 Bucket or Objects
- Cloudwatch Log Groups