As a data engineer, I have lost too many hours madly for simple task of turning Hive or Impala console output into a usable CSV file.
Problem :
I have employee table in hive (may be impala or Spark job log), I will execute a query in hive employee table which will give console output like this:
+-----+------------------+-------+
|empid|empname |salary|
| 1| Ram Ghadiyaram| 10000|
+-----+-------+----------+--------+
I want a quick csv export for above console printed query result.
Why 🙃:
One of the many usecases is in my unit test cases, I have to use this csv file with meaningful data from any database or spark job log ( for example fixing a production issue with production data from an email from site reliablity engineer i.e. SRE) to replicate data or schema issues.
In Highly confidential world like Fintech Banks, Insurance companies.. its not possible to login in to production and see data from hive or impala or spark job log with data etc… 😅
Below is the pyspark way to achieve csv from console output (can test directly from Intellij). Here I am using '|'
as delimiter to parse from spark directly and inferred the schema as well. so this data set is exact replica of console output.
import os
import re
import sys
from pyspark.sql import SparkSession
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder
.appName("String to CSV")
.getOrCreate()
# Input data as a string
input_data = """
+-----+------------------+-------+
|empid|empname |salary|
| 1| Ram Ghadiyaram| 10000|
+-----+-------+----------+--------+
""".replace("|n","n").replace("n|","n")
#remove +-----+-------+------+ from the string
input_data = re.sub(r'n[+-]+n' , 'n', input_data)
# Capture the input data as a string
df = spark.read.option("header","true").option("inferSchema","true").option("delimiter", "|").csv(spark.sparkContext.parallelize(input_data.split("n")))
df.printSchema()
df.show()
# Specify the path where you want to save the CSV file
output_path = "./output1.csv"
# Write the DataFrame as CSV
df.coalesce(1).write.csv(output_path, header=True)
# Stop the Spark session
spark.stop()
Cleaning Input :
Combining all step together :
Before :
+-----+------------------+-------+n
|empid|empname |salary|n
| 1| Ram Ghadiyaram| 10000|n
+-----+-------+----------+--------+n
After : The border lines are gone, leaving the header & data rows with internal pipes. The extra newlines at the start and end are harmless, as input_data.split("n")
and the CSV parser will ignore empty lines.
n
empid|empname |salaryn
1| Ram Ghadiyaram| 10000n
n
Now converting in to CSV using the below code
df = spark.read.option("header","true").option("inferSchema","true").option("delimiter", "|").csv(spark.sparkContext.parallelize(input_data.split("n")))
df.printSchema()
# Show the result CSV data
df.show()
# Specify the path where you want to save the CSV file
output_path = "./output1.csv"
# Write the DataFrame as CSV
df.coalesce(1).write.csv(output_path, header=True)
Final Output 😀
Generated CSV :
Note : For large csv you can compress for example :
df.write.mode("overwrite").option("compression", "gzip").csv
Why this is important :
- Unit Testing: For example generate test data from production-like queries (from any database/warehouse or spark log).
- Data Exports: Share query results with non-technical stakeholders like business or non technical users.
- Debugging: Capture and analyze console output during development.
- Generic Parsing: Will suite to any table console output, such as Spark
DataFrame show()
Using this technique of converting in to CSV from any console output, I was able to replicate data problems quickly for many datasets… for illustrative purpose demoed small data. Happy learning 😃 Happy Problem Solving 😀