AWS Athena SQL parser for table and column audit logging
This is the sixth article in our series on parsing SQL in different database technologies and SQL dialects. We explored SQL parsing on Snowflake, MS SQL Server, Oracle , Databricks and Redshift in our earlier blog posts. We cover SQL parsing on AWS Athena in this blog post and go through a practical example of table and column audit logging.
We provide practical examples of deconstructing SQL from the AWS Athena query history. Additionally, we will present some code that utilises the FlowHigh SQL parser SDK to programmatically parse SQL from AWS Athena. The parsing of AWS Athena can be automated using the SDK.
In another post on the Sonra blog, I go into great depth on the use cases of using an SQL parser for data engineering and data governance use cases. The mentioned article is a complete deep dive into SQL parsing, how it works and how you can use it for use cases in data engineering and data governance.
FlowForward.
All Things Data Engineering
Straight to Your Inbox!
SQL parser for AWS Athena
Sonra has created an online SQL parser. Our vision is to be SQL dialect agnostic. The parser also works with AWS Athena. It is called FlowHigh. Our SaaS platform includes a UI for manual SQL parsing as well as an SDK for managing bulk SQL parsing requirements or automating the process. We demonstrate FlowHigh’s capabilities by parsing the query history of AWS Athena.
Let’s look at both options starting with the SDK for automating SQL parsing.
Programmatically parsing the AWS Athena query history with the FlowHigh SDK
AWS Athena provides a powerful API that allows users to interact with its features programmatically. One of the valuable capabilities of this API is the ability to retrieve the history of SQL queries executed in Athena. By using the list_query_executions() method from the Athena client in the Boto3 library, users can obtain a list of query execution IDs. Each of these IDs corresponds to an individual SQL query that was run in Athena. To further obtain the details of each query, including the actual SQL text, users can pass these execution IDs to the get_query_execution() method. This not only returns the SQL text but also provides other metadata such as the query’s execution time, status, and data scanned. When combined, these methods provide a comprehensive view of the SQL query history in Athena, making it easier for users to audit, review, or replicate past queries.
Sample output of api methods.
list_query_executions()
Will return list of query ids execute in the specified workgroup.
1 |
['73ea3d92-5de8-4d20-a564-ea5e15eee6fd', '3b2802c4-16cc-49c4-b39a-4aecdbf44061', '5fe5d798-22b2-404b-9333-a8034988472e', '735b21be-4070-4371-9372-bd4615b0561b', '0746d0ac-4cc5-40f2-a4fe-c9059dcfecae'] |
get_query_execution()
The function will retrieve the specifics of the query when provided with a query ID as its argument as shown below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
{ "QueryExecution":{ "QueryExecutionId":"73ea3d92-5de8-4d20-a564-ea5e15eee6fd", "Query":"select * from testdb.test", "StatementType":"DML", "ResultConfiguration":{ "OutputLocation":"s3://athenaan/athena/Unsaved/2023/09/22/73ea3d92-5de8-4d20-a564-ea5e15eee6fd.csv", "AclConfiguration":{ "S3AclOption":"BUCKET_OWNER_FULL_CONTROL" } }, "ResultReuseConfiguration":{ "ResultReuseByAgeConfiguration":{ "Enabled":false } }, "QueryExecutionContext":{ "Database":"testdb", "Catalog":"awsdatacatalog" }, "Status":{ "State":"FAILED", "StateChangeReason":"HIVE_BAD_DATA: Not valid Parquet file: s3://databucketathena/annual-enterprise-survey-2021-financial-year-provisional-size-bands-csv.csv expected magic number: PAR1 got: s)\r\n", "SubmissionDateTime":datetime.datetime(2023, 9, 22, 11, 2, 24, 905000, "tzinfo=tzlocal())", "CompletionDateTime":datetime.datetime(2023, 9, 22, 11, 2, 25, 264000, "tzinfo=tzlocal())", "AthenaError":{ "ErrorCategory":2, "ErrorType":1002, "Retryable":false, "ErrorMessage":"HIVE_BAD_DATA: Not valid Parquet file: s3://databucketathena/annual-enterprise-survey-2021-financial-year-provisional-size-bands-csv.csv expected magic number: PAR1 got: s)\r\n" } }, "Statistics":{ "EngineExecutionTimeInMillis":256, "DataScannedInBytes":0, "TotalExecutionTimeInMillis":359, "QueryQueueTimeInMillis":91, "QueryPlanningTimeInMillis":52, "ServiceProcessingTimeInMillis":12, "ResultReuseInformation":{ "ReusedPreviousResult":false } }, "WorkGroup":"test", "EngineVersion":{ "SelectedEngineVersion":"AUTO", "EffectiveEngineVersion":"Athena engine version 3" }, "SubstatementType":"SELECT" }, "ResponseMetadata":{ "RequestId":"8ad6b8a6-5693-4cf6-b666-eab1145d76fe", "HTTPStatusCode":200, "HTTPHeaders":{ "date":"Tue, 26 Sep 2023 16:39:02 GMT", "content-type":"application/x-amz-json-1.1", "content-length":"2428", "connection":"keep-alive", "x-amzn-requestid":"8ad6b8a6-5693-4cf6-b666-eab1145d76fe" }, "RetryAttempts":0 } } |
The python code in the next section shows how the query history is pulled from AWS Athena and processed using the FlowHigh SDK:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
import boto3 import pandas as pd from flowhigh.utils.converter import FlowHighSubmissionClass aws_access_key_id = 'key_id' aws_secret_access_key = 'access_key' # Initialize a session with the provided credentials and region session = boto3.Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name='region' ) # Use the session to create an Athena client athena = session.client('athena') def list_query_executions(workgroup_name): """ List query executions for the specified workgroup. """ execution_ids = [] next_token = None while True: if next_token: response = athena.list_query_executions(WorkGroup=workgroup_name, NextToken=next_token) else: response = athena.list_query_executions(WorkGroup=workgroup_name) execution_ids.extend(response['QueryExecutionIds']) next_token = response.get('NextToken') if not next_token: break return execution_ids def get_query_text(query_id): """ Retrieve the query text for a given query ID. """ response = athena.get_query_execution(QueryExecutionId=query_id) return response['QueryExecution']['Query'] # Retrieve and print query execution IDs and their corresponding query texts query_ids = list_query_executions("workgroup_name") result = [(query_id, get_query_text(query_id)) for query_id in query_ids] data = [] for query_id, query_text in result: fh = FlowHighSubmissionClass.from_sql(query_text) json_msg = fh.json_message entry = {'query_id': query_id, 'fh_response': json_msg} data.append(entry) data_df = pd.DataFrame(data) print(data_df) |
Analysing the output of the FlowHigh SQL parser
The FlowHigh SQL parser for AWS Athena processes incoming SQL queries and outputs the results in either JSON or XML format. For example, from our collected query history, the parser generates a comprehensive JSON or XML representation of the SQL query. This detailed output includes data on filter criteria, retrieved columns, aliases used, join conditions, tables, and other clauses of the SQL command.
Let’s go through an example. We will use a simple SQL query to demonstrate some of the features of the parser.
1 2 3 4 5 6 7 8 9 10 |
SELECT u.username ,w.URL ,COUNT(w.response_code) AS total_requests FROM weblogs_db.weblogs w JOIN weblogs_db.users u ON w.user_id=u.user_id WHERE w.response_code='200' -- Only successful requests GROUP BY u.username ,w.URL ORDER BY total_requests DESC; |
The SQL parser also supports other DML and DDL statements such as CREATE, INSERT, UPDATE, MERGE etc.
Let’s examine the XML output from the FlowHigh SQL parser for the given SQL query. This XML format is more concise than its JSON cousin. Both types of messages have the same content though.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
<?xml version="1.0" encoding="UTF-8"?> <parSeQL version="1.0" status="OK" ts="2023-09-26T08:26:22.930Z" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://flowhigh.sonra.io/flowhigh_v1.0.xsd"> <statements> <statement pos="0-287"> <ds pos="0-287" type="root" subType="inline"> <out> <attr pos="7-10" oref="C1"/> <attr pos="26-5" oref="C2"/> <func xsi:type="fagg" pos="40-40" alias="total_requests" name="COUNT"> <attr pos="46-15" oref="C3"/> </func> </out> <in> <ds pos="89-20" alias="w" oref="T1"/> <join type="inner" definedAs="explicit"> <ds pos="118-18" alias="u" oref="T2"/> <op pos="145-19" type="EQ"> <attr pos="145-9" oref="C4"/> <attr pos="155-9" oref="C5"/> </op> </join> </in> <filter xsi:type="filtreg"> <op pos="172-21" type="EQ"> <attr pos="172-15" oref="C3"/> <const>'200'</const> </op> </filter> <agg xsi:type="aggreg"> <attr pos="232-10" oref="C1"/> <attr pos="251-5" oref="C2"/> </agg> <sort> <attr pos="268-14" direction="desc" sref="40-40" oref="C3"/> </sort> </ds> </statement> </statements> <DBOHier> <dbo oid="S1" type="SCHEMA" name="weblogs_db"> <dbo oid="T1" type="TABLE" name="weblogs"> <dbo oid="C2" type="COLUMN" name="URL"/> <dbo oid="C3" type="COLUMN" name="response_code"/> <dbo oid="C4" type="COLUMN" name="user_id"/> </dbo> <dbo oid="T2" type="TABLE" name="users"> <dbo oid="C1" type="COLUMN" name="username"/> <dbo oid="C5" type="COLUMN" name="user_id"/> </dbo> </dbo> </DBOHier> </parSeQL> |
The XML output generated by the FlowHigh SQL parser for AWS Athena provides an in-depth analysis of the SQL statement.
Tables and columns
From the XML structure we see that tables weblogs and users from the schema weblogs_db are referenced in the SQL query. The columns involved from these tables are URL, response_code, user_id from the weblogs table, and username, user_id from the users table. The name of the schemas, tables, and columns can be found in the xml from the FH parser.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<DBOHier> <dbo oid="S1" type="SCHEMA" name="weblogs_db"> <dbo oid="T1" type="TABLE" name="weblogs"> <dbo oid="C2" type="COLUMN" name="URL"/> <dbo oid="C3" type="COLUMN" name="response_code"/> <dbo oid="C4" type="COLUMN" name="user_id"/> </dbo> <dbo oid="T2" type="TABLE" name="users"> <dbo oid="C1" type="COLUMN" name="username"/> <dbo oid="C5" type="COLUMN" name="user_id"/> </dbo> </dbo> </DBOHier> |
Joins
There is an inner join identified between the weblogs table (with alias w) and the users table (with alias u). The join condition dictates that the user_id from the weblogs table matches the user_id from the users table.
You can find the JOIN in the below section of the XML output.
1 2 3 4 5 6 7 |
<join type="inner" definedAs="explicit"> <ds pos="118-18" alias="u" oref="T2"/> <op pos="145-19" type="EQ"> <attr pos="145-9" oref="C4"/> <attr pos="155-9" oref="C5"/> </op> </join> |
Internally the column user_id from T1 and user_id from T2 is referenced as C4 and C5 , which can be looked up from the query hierarchy <DBOHier> at the end of the XML message.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<DBOHier> <dbo oid="S1" type="SCHEMA" name="weblogs_db"> <dbo oid="T1" type="TABLE" name="weblogs"> <dbo oid="C2" type="COLUMN" name="URL"/> <dbo oid="C3" type="COLUMN" name="response_code"/> <dbo oid="C4" type="COLUMN" name="user_id"/> </dbo> <dbo oid="T2" type="TABLE" name="users"> <dbo oid="C1" type="COLUMN" name="username"/> <dbo oid="C5" type="COLUMN" name="user_id"/> </dbo> </dbo> </DBOHier> |
GROUP BY
The XML indicates an aggregation based on the username column from the users table and the URL column from the weblogs table.
You can find the GROUP BY in the aggregation section of the XML output
1 2 3 4 |
<agg xsi:type="aggreg"> <attr pos="232-10" oref="C1"/> <attr pos="251-5" oref="C2"/> </agg> |
Internally the column username is referenced as C1 and column URL is referenced as C2, which can be looked up from the query hierarchy <DBOHier> at the end of the XML message.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<DBOHier> <dbo oid="S1" type="SCHEMA" name="weblogs_db"> <dbo oid="T1" type="TABLE" name="weblogs"> <dbo oid="C2" type="COLUMN" name="URL"/> <dbo oid="C3" type="COLUMN" name="response_code"/> <dbo oid="C4" type="COLUMN" name="user_id"/> </dbo> <dbo oid="T2" type="TABLE" name="users"> <dbo oid="C1" type="COLUMN" name="username"/> <dbo oid="C5" type="COLUMN" name="user_id"/> </dbo> </dbo> </DBOHier> |
FILTER
The query incorporates a filter condition on the response_code column from the weblogs table. Specifically, it selects records where the response_code value is ‘200’. This can be found in the filter section of the XML output:
1 2 3 4 5 |
<filter xsi:type="filtreg"> <op pos="172-21" type="EQ"> <attr pos="172-15" oref="C3"/> <const>'200'</const> </op> |
Internally the column response_code is referenced as C3, which can be looked up from the query hierarchy <DBOHier> at the end of the XML message.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<DBOHier> <dbo oid="S1" type="SCHEMA" name="weblogs_db"> <dbo oid="T1" type="TABLE" name="weblogs"> <dbo oid="C2" type="COLUMN" name="URL"/> <dbo oid="C3" type="COLUMN" name="response_code"/> <dbo oid="C4" type="COLUMN" name="user_id"/> </dbo> <dbo oid="T2" type="TABLE" name="users"> <dbo oid="C1" type="COLUMN" name="username"/> <dbo oid="C5" type="COLUMN" name="user_id"/> </dbo> </dbo> </DBOHier> |
ORDER BY
The results of the query are sorted in descending order based on the count of response_code column from the weblogs table. This ordering ensures that the records with a specific response code appear at the top of the output. This can be found in the sort section of the XML output:
1 2 3 |
<sort> <attr pos="268-14" direction="desc" sref="40-40" oref="C3"/> </sort> |
While AWS Athena may not always provide granular details about specific tables and columns in a query, FlowHigh supplements this information. It not only identifies the tables and columns but also zeroes in on the columns involved in join operations.
FlowHigh User Interface for SQL parsing
You can also access the FlowHigh SQL parser through the web based user interface. The below figure shows how FlowHigh provides the information about tables in a SQL query by grouping them into types of tables.
We took the Athena SQL query example and parsed it through the web user interface.
We got the weblogs and users tables back. As you can see the web UI also classifies these two tables as physical tables. Other types of tables are CTE, Views etc.
When we select a table name, it reveals the associated column names. For instance, by selecting the WEBLOGS table, we can view its corresponding column names.
Likewise FlowHigh can be used to get columns used in a where conditions ,order by, group by and joins in the SQL query.
Columns used in GROUP BY / ORDER BY clause
This figure shows how FlowHigh can be used to filter out the columns used in order by and group by clause.
Filter columns
This figure shows how FlowHigh can be used to filter out the columns used in where clauses.
Columns in join conditions
This figure shows how FlowHigh can be used to filter out the columns used and types of join. I have outlined in detail how this type of information can be very useful for data engineering scenarios to identify indexes, cluster keys etc.
Need more?
FlowHigh ships with two other modules
- FlowHigh SQL Analyser. The Analyser checks for issues in your SQL coed. It detects 30+ anti patterns in SQL code
I have written up a blog post on automatically detecting bad SQL code.
- FlowHigh SQL Visualiser. Visualising SQL queries helps understand complex queries. It lets developers see how each part of the query contributes to the final result, making it easier to understand and debug.
You can try FlowHigh yourself.