athena

How to load partitions in Amazon Athena

Many teams rely on Athena, as a serverless way for interactive query and analysis of their S3 data. One important step in this approach is to ensure the Athena tables are updated with new partitions being added in S3. This allows you to transparently query data and get up-to-date results.

In this article, I will share few approaches to load partitions with their pros and cons. This list is not exclusive, you should implement a design that best suits your use case.

The data set used as an example in this article is from UCI and publicly available here. I uploaded few sample files to an S3 bucket with single partition as “s3://techwithcloud/rawdata/retail/dt=yyyy-mm-dd/file<number>.csv”. To monitor Athena API calls to this bucket, a Cloudtrail was also created along with a Lifecycle policy to purge objects from query output bucket.

-- Create table in Athena to read sample data which is in csv format.
CREATE EXTERNAL TABLE IF NOT EXISTS retail_rawdata (
  Invoice INT,
  StockCode STRING,
  Description STRING,
  Quantity INT
)
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION 's3://techwithcloud/rawdata/retail'
TBLPROPERTIES (
    'skip.header.line.count'='1'
);

1. MSCK REPAIR TABLE

After creating a table in Athena, first step is to execute “MSCK REPAIR TABLE” query. This is also the simplest way to load all partitions but quite a time consuming and costly operation as the number of partitions grows. Besides, Athena might get overloaded if you have multiple tables (each mapping to their respective S3 partition) and run this query frequently for each table. Athena will add these queries to a queue and executes them when resources are available. This thread in Athena forum has good discussion on this topic.

A newly created Athena table will have no records until partitions are loaded.

Athena shows zero records for SELECT query of a newly created table
-- Load all partitions
MSCK REPAIR TABLE retail_rawdata;
Athena shows this message after running MSCK REPAIR TABLE
-- Running SELECT query again
select * from retail_rawdata limit 5;
After loading partitions output of SELECT query with limit 5

Note – The partitioned column is part of SELECT query output even though it was not specifically provided as a column inside the create table statement block.

Running “MSCK REPAIR TABLE” again without adding new partitions, won’t result in any message in Athena console because it’s already loaded. Next, I checked Cloudtrail logs to verify if Athena did any Get/List calls (since this partition is part of meta store now).

As per the Cloudtrail logs, every call to “MSCK REPAIR TABLE” results in Athena scanning the S3 path (provided in the LOCATION). It does a ListObjects on S3 path and then for each partition executes a HeadObject followed by ListObjects (irrespective of partition loaded or not). As and when new partitions are added, this will take time and add to your cost thus a naive way of loading partitions.

2. ALTER TABLE ADD PARTITION

Another way to add partitions is the “ALTER TABLE ADD PARTITION” statement. In this approach, users need to provide the partition value(s) which they want to load. This eliminates the need to scan all the S3 prefixes but requires users to have some mechanism that tracks new partition values to be loaded.

-- Load single partition
alter table retail_rawdata add partition(dt='2020-12-22');

-- Load multiple partitions
alter table retail_rawdata add 
partition(dt='2020-12-22')
partition(dt='2020-12-23');

An intuitive approach might be to pre-compute partition value (if it follows a pattern e.g. date, timestamp) and preemptively run “ALTER TABLE ADD PARTITION” every fixed interval. This hack may not work in real-world use cases because data doesn’t always arrive in order & sorted by partition values. As an example, a partition with value dt=’2020-12-05′ in S3 will not guarantee that all partitions till ‘2020-12-04’ are available in S3 and loaded in Athena. You must anticipate an out of order delivery.

Note – A partition needs to be loaded in Athena only once, not for every file uploaded under that partition.

3. ALTER TABLE ADD PARTITION with S3 Event Notification and Lambda

Loading partitions in Athena using S3 event notification

Instead of users tracking each partition, a cloud-native approach will be to leverage S3 bucket event notification in conjunction with Lambda. For every “s3:ObjectCreated:Put” (or s3:ObjectCreate:*”) event, filtered for the partitioned object prefix, S3 will call a lambda function passing the full prefix. This lambda will then submit an “Alter Table Add Partition” query to Athena.

A sample lambda function (using python and boto3 library) to submit this query to Athena.

import boto3
import os
from botocore.config import Config
from urllib.parse import unquote_plus

db_name = os.environ['DB_NAME']
table_name = os.environ['TABLE_NAME']
query_output = os.environ['QUERY_OUTPUT']

config = Config( retries = { 'max_attempts': 2, 'mode': 'standard' } )
athena = boto3.client('athena', config=config)

def submit_to_athena(partition_name):
    query = 'ALTER TABLE {table} ADD PARTITION ({partition})'.format(table=table_name, partition=partition_name)
    response = athena.start_query_execution(
                    QueryString=query,
                    QueryExecutionContext={
                        'Database': db_name
                    },
                    ResultConfiguration={
                        'OutputLocation': query_output
                    }) 
    return response['QueryExecutionId']

def lambda_handler(event, context):
    # Records is array but it will always have one record 
    # when used with S3 bucket event notification
    record = event['Records'][0]
    key = unquote_plus(record['s3']['object']['key'])
    partition = key.replace('rawdata/retail/', '').split('/')[0].replace('dt=','')
    athena_job_id = submit_to_athena('dt=\''+ partition +'\'')
    print('athena_job_id='+athena_job_id)
# create an event notification for S3 bucket
aws s3api put-bucket-notification-configuration --bucket techwithcloud --notification-configuration file://notification.json

# content of notification.json
{
    "LambdaFunctionConfigurations": [
        {
            "Id": "NewPartitionEvent",
            "LambdaFunctionArn": "arn:aws:lambda:ap-south-1:2008XXXXXXXX:function:AthenaLoadPartition",
            "Events": [
                "s3:ObjectCreated:*"
            ],
            "Filter": { "Key": {  "FilterRules": [ { "Name": "Prefix", "Value": "rawdata/retail" } ] } }
        }
    ]
}

Added few partitions in S3, the “History” tab of Athena console confirms Lambda function executed successfully.

Screenshot from Athena History showing queries called from Lambda

Pros – Fastest way to load specific partitions. Doesn’t require Athena to scan entire S3 bucket for new partitions. Suitable when creation of concurrent partitions is less than the limit on Lambda invocations. Ideal if only one file is uploaded per partition.

Cons – Since S3 will invoke Lambda for each object create event, it might throttle lambda service and Athena might also throttle. If multiple files are uploaded to single partition then the lambda function needs to either send the same partition value again or add a check to see if partitions are loaded or not.

4. ALTER TABLE ADD PARTITION with Persistent Store and Lambda

All the 3 approaches discussed so far, assume that each call to Athena will succeed and thus don’t track if the partitions were successfully loaded or not (must wait for Athena’s response). If any error during loading, then those partitions values should be retry. To achieve this, some sort of persistent storage is required where all the newly added partitions and query execution id from Athena should be saved till they are successfully loaded or max_retry is reached.

Similarly, if a partition is already loaded in Athena, then ideally it should not be called again. In cases when multiple files are uploaded in the same partition, each object creation will result in an event notification from S3 to Lambda. Though there won’t be an impact on tables because Athena will throw an exception and fail the query.

There are multiple options to improve the previous design, out of which I will discuss two approaches, one using Queue and another using a DB.

4.1 Using Queue

In this approach, a queue can be used to collect events from S3 and another queue to store the query execution id along with partition value. The message will be deleted only when that partition was loaded successfully else it should be put back in the queue for later retry.

I will use SQS as an example here because of S3 native support for event notification. This design has the benefits of using only 2 lambda functions at any given point of time (scheduled using Cloudwatch). There are few caveats such as; max 10 messages per poll & processing duplicates records, because in many real-world scenarios a partition folder in S3 will have multiple files uploaded.

Athena Load Partitions – S3, SQS, Lambda
  1. Every ObjectCreate event to rawdata S3 bucket triggers an event notification to SQS Queue-1 as destination.
  2. First Lambda function (scheduled periodically by Cloudwatch), polls Queue-1 and fetches max 10 messages.
  3. Parse message body to get partition value (e.g. dt=yyyy-mm-dd), call “Alter Table Load Partitions” and get the query execution id.
  4. Send the query execution id and the message to SQS Queue-2 and delete this message from Queue-1
  5. Second Lambda function (scheduled periodically by Cloudwatch), polls SQS Queue-2
  6. Lambda-2 checks the query execution status from Athena.
  7. Delete message from SQS Queue-2 if status was Success or Failed.
  8. If query state was “Failed” but reason is not “AlreadyExistsException”, then add the message back to SQS Queue-1

There are no charges for Data Definition Language (DDL) statements like CREATE/ALTER/DROP TABLE, statements for managing partitions, or failed queries.

https://aws.amazon.com/athena/pricing/

Another option can be to add SQS trigger for lambda function. This design will result in a lot of lambda functions being invoked (one per event) and the same key prefix being sent to multiple functions. I prefer to control the invocation of Lambda functions such that at any given point of time only one lambda is polling SQS thus eliminating concurrent receiving of duplicate messages. Having 1 consumer for SQS simplifies a lot of things.

Sample code for both lambda functions is available on github.

Note – After configuring SQS as a destination for S3 events, an “s3:TestEvent”, is generated and sent by S3. Be careful to remove this message from the queue or add a logic in Lambda to ignore such messages.

4.2 Using Database

In this approach, a DB is used to store the partition value (primary key), query_execution_id, status and creation_time. This solution will add some cost as compared to previous ones but a major benefit of this design is that you don’t need to write additional logic to prevent loading same partition value again. Besides, almost every solution has some sort of database (RDS or self-managed), thus an existing DB instance can be leveraged for this use case.

Athena Load Partitions – S3, SQS, Lambda, RDS

An SQS Queue is used to collect records directly from S3 for every “ObjectCreate” event. Another option is to invoke Lambda directly from S3 but additional logic needs to be coded to handle throttling errors & DLQ. Using a Queue in between S3 and Lambda provides benefit of limiting Lambda function invocation as per use case, and also a limited number of concurrent writes to RDS to reduce exhausting DB connections.

-- Sample table creation in PostgreSQL 
CREATE TABLE athena_partitions
(
    p_value character varying NOT NULL,
    query_exec_id character varying,
    status character varying NOT NULL DEFAULT '',
    creation_time timestamp NOT NULL DEFAULT NOW(),
    PRIMARY KEY (p_value)
);

Our first lambda function (scheduled using Cloudwatch), will poll SQS and get max 10 messages. It will extract the partition values and do a bulk UPSERT operation (INSERT IF NOT EXISTS)

-- Postgresql
INSERT INTO athena_partitions (p_value) VALUES ('dt=2020-12-25'), ('dt=2020-12-26')
ON CONFLICT ON CONSTRAINT athena_partitions_pkey DO NOTHING;

A Second Lambda function (scheduled from Cloudwatch) will perform a select operation

--Read rows where partition values are not loaded completely
SELECT p_value FROM athena_partitions WHERE status == '' OR status == 'STARTED' LIMIT 10;

For rows returned, where status == ” the function will call “Alter Table Load Partitions” and update the row with status=’STARTED’ and the query execution id from Athena.

--Sample update in PostgreSQL after receiving query execution id from Athena
UPDATE athena_partitions 
SET
  query_exec_id = 'a1b2c3d4-5678-90ab-cdef', 
  status = 'STARTED' 
WHERE
  p_value = 'dt=2020-12-25'

For rows returned where status == ‘STARTED’ the function will check query execution status from Athena and update the status accordingly. A failed status means empty status.

--Sample update in PostgreSQL if loading failed (to be retried later)
UPDATE athena_partitions 
SET
  status = '' 
WHERE
  p_value = 'dt=2020-12-25'

To Delete rows, I recommend to have either a cron job or another Lambda function, that will run periodically and delete rows having “creation_time” column value older than ‘X’ minutes/hours. The value of ‘X’ depends on your use case, i.e. when should you expect all files of a partitions to be available in S3 (e.g. 1 hour, 12 hours, 7 days)

You can find nice examples to connect & query from RDS in the references below.

References

Thanks for reading, welcome your feedback.

Categories: athena, aws

Tagged as: , ,

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s