Converting ACORD XML to Avro row storage
In this example we will use Flexter to convert an XML file to the Apache Avro format. We then query and analyse the output in the Spark-Shell.
Flexter can generate a target schema from an XML file or a combination of XML and XML schema (XSD) files. We will use the data from The ACORD RLC Insurance and Reinsurance Service Business Message Specification which is the result of a cooperative industry effort of RLC members including ins/reinsurers, ins/reinsurance intermediaries, clients/cedents and technology service providers.
We will be parsing the RegulatoryReporting and the Placing Message template files using Flexter and will be using the ACORD RLC XML Schema which is complied in conjunction with the industry standards facilitating fast, accurate data exchange. The XML and XSD files can be downloaded from the link here. The XML files used are
- RegulatoryReporting-template2015-04.xml
- Placing-template2015-04.xml
And the XSD file used is
- Jv-Ins-Reinsurance-2015-04.xsd
Both the XML files and the XSD are available and we use the information from both files to generate the target schema.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# We first test that the XML file is well formatted by simulating the execution the skip switch (-s) $ xml2er -s RegulatoryReporting-template2015-04.xml # Next we extract the statistics from data.xml. Statistics are used to generate the target schema. We use the xml2er command line tool without the skip (-s) switch. $ xml2er RegulatoryReporting-template2015-04.xml … # The result of this operation is an ID (origin: 5). We will use this ID in subsequent steps origin: 61 job: 160 # Some useful execution statistics startup: 10444 ms parse: 1445 ms stats: 10321 ms map: 3 ms unique xpaths: 219 # Similarly we use xml2er for the Placing-template2015-04.xml file $ xml2er RegulatoryReporting-template2015-04.xml … # The result of this operation is an ID (origin: 5). We will use this ID in subsequent steps # schema origin: 62 job: 161 # statistics startup: 4541 ms parse: 1287 ms stats: 10197 ms map: 3 ms unique xpaths: 1206 |
Now that we have gathered statistics from our XML sample we can generate the logical target schema with the xsd2er command line tool using the -k switch (-k is shortcut for –use-stats)
1 |
-k, --use-stats <ID[,ID2..]> Use the stats to generate the new schema |
Let’s go through the steps
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# Template $ xsd2er -s -k<XML Schema ID> -g<Optimization Level> INPUTPATH # We first simulate generating the target schema with -s skip switch $ xsd2er -s -k60 -g1 Jv-Ins-Reinsurance-2015-04.xsd # everything worked. Now running the command for real without skip $ xsd2er -k60 -g1 Jv-Ins-Reinsurance-2015-04.xsd … # metadata path: jdbc:postgresql:XXname user: XXuser password: *** map: 1 # input use-stats: 60 # schema job: 163 19:57:40.444 INFO Initialized in 492 milliseconds 19:57:40.449 INFO Reading metadata stats 19:57:40.506 INFO Parsing xml schema 19:57:41.076 INFO Parsing Jv-Ins-Reinsurance-2015-04.xsd 19:57:41.941 INFO Parsing Acord-Repository_v-1-3-0-RLC-Slice.xsd 19:57:41.959 INFO Parsing AcordMsgSvc_base_v-1-5-0.xsd 19:57:41.976 INFO Linking xml schema 19:57:41.978 INFO Linking Jv-Ins-Reinsurance-2015-04.xsd 19:57:41.980 INFO Linking Acord-Repository_v-1-3-0-RLC-Slice.xsd … # schema origin: 60 logical: 26 job: 163 # statistics # statistics startup: 492 ms stats-load: 60 ms parse: 1768 ms build: 409 ms write: 266 ms map: 2227 ms xpaths: 219 # Next, we execute the similar steps for Placing-template2015-04.xml file $ xsd2er -k61 -g1 Jv-Ins-Reinsurance-2015-04.xsd # metadata path: jdbc:postgresql:XXname user: XXuser password: *** map: 1 # input use-stats: 61 # schema job: 164 20:00:38.291 INFO Initialized in 492 milliseconds 20:00:38.295 INFO Reading metadata stats 20:00:38.343 INFO Parsing xml schema 20:00:38.728 INFO Parsing Jv-Ins-Reinsurance-2015-04.xsd 20:00:39.631 INFO Parsing Acord-Repository_v-1-3-0-RLC-Slice.xsd 20:00:39.648 INFO Parsing AcordMsgSvc_base_v-1-5-0.xsd 20:00:39.665 INFO Linking xml schema 20:00:39.667 INFO Linking Jv-Ins-Reinsurance-2015-04.xsd 20:00:39.668 INFO Linking Acord-Repository_v-1-3-0-RLC-Slice.xsd 20:00:39.668 INFO Linking AcordMsgSvc_base_v-1-5-0.xsd # schema origin: 61 logical: 27 job: 164 # statistics startup: 492 ms stats-load: 47 ms parse: 1641 ms build: 415 ms write: 133 ms map: 581 ms xpaths: 219 |
Happy days. Now we use the Logical Schema ID (origin: 60 and 61) to convert the XML data to avro.
[flexter_button]
Next we need to add the avro package to use it with xml2er. We can submit sprak-submit parameters in the following format.
1 2 |
# Template for submitting spark parameters with xml2er $ xml2er [spark-submit params...] -- [xml2er params...] |
Here, we can use –packages and –conf, spark-submit parameter to ensure that avro library and its dependencies are added to successfully parse and extract the XML files in the desired format.
1 2 |
# First simulating the conversion process $ xml2er -s --packages com.databricks:spark-avro_2.11:3.2.0 --conf spark.sql.avro.compression.codec=deflate --conf spark.sql.avro.deflate.level=5 -- -o $PWD -l26 RegulatoryReporting-template2015-04.xml -f com.databricks.spark.avro |
When the command is ready, removing –skip or -s, allows us to process the data. We direct the parquet output to the output directory for the data.xml file. We can use the $PWD variable in the shell to generate the output files in the current working directory. Also, we use -f for the output format(like, jdbc, parquet, orc, json, csv, tsv), the default format is parquet and we want the output in avro format in our case.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# Generating the data from the XML files into avro $ xml2er --packages com.databricks:spark-avro_2.11:3.2.0 --conf spark.sql.avro.compression.codec=deflate --conf spark.sql.avro.deflate.level=5 -- -o $PWD -l26 RegulatoryReporting-template2015-04.xml -f com.databricks.spark.avro … # schema origin: 61 logical: 26 job: 160 # Some useful execution statistics startup: 10444 ms parse: 1445 ms stats: 10321 ms map: 3 ms unique xpaths: 219 # Similarly doing it for Placing-template2015-04.xml file $ xml2er --packages com.databricks:spark-avro_2.11:3.2.0 --conf spark.sql.avro.compression.codec=deflate --conf spark.sql.avro.deflate.level=5 -- -o $PWD -l27 Placing-template2015-04.xml -f com.databricks.spark.avro … # schema origin: 67 logical: 27 job: 170 # statistics startup: 4476 ms load: 12137 ms parse: 325 ms write: 17400 ms stats: 1835 ms map: 3 ms unique xpaths: 1206 |
We can find the extracted avro files in the output folder. It is a directory structure, which you can find in the current directory. We can ‘ls’ to see the contents of the .avro folder as shown below.
1 2 3 4 5 6 7 8 9 10 11 12 |
# Looking at the avro files generated $ ls -l total 92 drwxr-xr-x 2 root root 4096 Apr 22 20:17 Allocation1.com.databricks.spark.avro drwxr-xr-x 2 root root 4096 Apr 22 20:17 Allocation2.com.databricks.spark.avro drwxr-xr-x 2 root root 4096 Apr 22 20:17 Brokerage.com.databricks.spark.avro drwxr-xr-x 2 root root 4096 Apr 22 20:17 Clause.com.databricks.spark.avro … # Looking inside an avro folder $ cd cd TaxProvision.com.databricks.spark.avro/ $ ls part-00000-b2e73d26-6c85-45ab-8516-4490aab81e9e.avro _SUCCESS |
The complete list of avro files parsed using Flexter can be downloaded here for Placing-template and here for RegulatoryReporting-template.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# Avro library can be added to Spark jobs launched through spark-shell or spark-submit by using the --packages command line option $ spark-shell --packages com.databricks:spark-avro_2.11:4.0.0 Spark context Web UI available at http://172.17.0.2:4041 Spark context available as 'sc' (master = local[*], app id = local-1515355322712). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1.1 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_151) Type in expressions to have them evaluated. Type :help for more information. scala> |
Once we have initiated the spark-shell, we can proceed with reading the avro files generated and import them as dataframes in spark.
1 2 3 4 5 6 7 8 |
# import needed for the .avro method to be added scala>import com.databricks.spark.avro._ scala>import org.apache.spark.sql.SparkSession # The Avro records get converted to Spark types, filtered, and then written back out as Avro records # Creating dataframes using the generated avro files scala> val df = spark.read.avro("/path/TaxProvision.com.databricks.spark.avro") df: org.apache.spark.sql.DataFrame = [FK_TaxProvisions: string, TaxAuthorityLocation_Country: string ... 5 more fields] ... |
We can take a look at the schema of the data frames generated and do some preliminary analysis before proceeding further on the data parsed. For example, let’s look at the “Tax Provisions table” created above.
1 2 3 4 5 6 7 8 9 10 |
# printing the Schema of the dataframes created above scala> df.printSchema() root |-- FK_TaxProvisions: string (nullable = true) |-- TaxAuthorityLocation_Country: string (nullable = true) |-- TaxClass: string (nullable = true) |-- TaxPayerPartyRole: string (nullable = true) |-- TaxRateBasis: string (nullable = true) |-- TaxType: string (nullable = true) |-- TaxTypeDescription: string (nullable = true) |
We can see the headers and data types of the various columns. We can also perform some basic analysis on the dataset in Scala and look at the various variables present
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# showing all the values of TaxClass column in df scala> df.select("TaxClass").show() +-----------------+ | TaxClass| +-----------------+ | premium_tax| |parafiscal_charge| +-----------------+ # showing all the distinct values of TaxType column in df scala> df.select("TaxType").show() +--------------------+ | TaxType| +--------------------+ |insurance_premium...| | social_security| +--------------------+ |
Various basic data processing tasks can be performed on the dataframe generated. The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.
1 2 3 4 |
# filtering the data frame based on the values of a certain column scala> df.filter($"<column-name>" > value).show() # group by the values of a column and creating a count scala> df.groupBy("<column-name>").count().show() |
We can also use the ER diagram for the XML files parsed and do analysis on the basis of the requirement as per the diagram in the spark-shell once we have all the data frames ready for all the avro files. The ER mapping for the RegulatoryReporting-template2015-04.xml and the Placing-template2015-04.xml files can be found here.
[faq_button]