Query BigQuery with Python

This week, I wanted to connect with BigQuery using Python. BigQuery is a service from Google, which helps to store and process large volumes of data quickly. We are using it to store Google billing data and wanted to generate some reports by executing different SQL queries.

from google.cloud import bigquery
from google.oauth2 import service_account

auth_json = Your JSON for authentication

credentials = service_account.Credentials.from_service_account_info(auth_json)

client = bigquery.Client(project="project name", credentials=credentials)

query_job = client.query("""your query inside""")

results = query_job.result()  # Waits for job to complete.

for row in results:
    print("{}".format(row.id, any other fields))


 

The above code shows, how we can connect and query the bigquery. It returns an iteratable, which can allow looping over the rows and access each column as shown in the code.

I hope it would help you in your big projects.

Resize Disk of an Openstack’s Instance

In my current job, we are using Openstack to test and deploy our projects locally. Last week, I got an email from my colleague, and he told me that he cannot install any software in his Openstack’s instance.

When I checked the instance using the following command:

df -h –total

I found out that its root disk is full. That is why, he cannot install any software. After taking help from my friend GOOGLE. I found the following article (https://access.redhat.com/solutions/2151971) to resize cinder volume of an Openstack instance. I followed the following steps:

  1. Stop the instance (nova stop <isntance id>)
  2. Reset state of the volume to available (cinder reset-state –state available <cinder volume id>)
  3. Extend volume size (cinder extend <cinder volume id> <size in GB>)
  4. Change the status of volume to in-use (cinder reset-state –state in-use <cinder volume id>)
  5. Start the instance (nova start <isntance id>)

After doing that, I was able to see disk size has increased but Ubuntu OS was still showing full disk. Then I found one more article (https://devops.ionos.com/tutorials/increase-the-size-of-a-linux-root-partition-without-rebooting/) to extend the root disk in Ubuntu OS. I did the following steps:

  1. Use fdisk utility on your root disk (fdisk /dev/vda)
  2. Delete your primary disk by pressing “d
  3. Recreate it using “n
  4. First sector, choose default
  5. Last sector, give maximum size using “+size
  6. Write the changes by pressing “w
  7. Now run “df -h –total” to see the effect

 

 

Import data to Elasticsearch from CSV

In a recent project, I required to import data from CSV to Elasticsearch. I found an online post on Stackoverflow (https://stackoverflow.com/questions/41573616/index-csv-to-elasticsearch-in-python) for this task. This post provides a way to bulk insert into Elasticsearch. However, in my case, I was getting Unicode error when I tried bulk insertion.

I solved this problem by inserting individually. There was not many records and speed was also not that bad. I used the following code to accomplish my task.

from elasticsearch import helpers, Elasticsearch
import csv

es = Elasticsearch([“localhost”])
i = 1
with open(‘data_v4_cats.csv’,’r’) as f:

reader = csv.DictReader(f)

for i,l in enumerate(reader):

res1=es.index(index=’categories’,doc_type=’category’,id=i,body=l)
print(i, res1)
i = i + 1

Update/refresh static data frame in streaming context

Last week, I was working on a streaming application and it was required to update all static data frames in the streaming context. I was stuck the whole week and could not find any solution on the internet. There was one guy who said about restarting streaming queries after updating the static data frames. However, he did not mention how it can be done.

I tried the query restart solution and the code can be seen below:

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.types import *
import pyspark.sql.functions as func
import time




spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

data = ["test", "spark"]
static_df = spark.createDataFrame(data,StringType())
static_df.show()



lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.join(static_df, func.expr("""value=word"""))

query = wordCounts \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()


while True:
    data = ["test", "spark", "hadoop"]
    static_df = spark.createDataFrame(data, StringType())
    static_df.show()
    query.stop()
    wordCounts = words.join(static_df, func.expr("""value=word"""))
    query = wordCounts \
        .writeStream \
        .outputMode("append") \
        .format("console") \
        .start()
    time.sleep(60)

The important point is that after stopping the query you should write all your transformations with static dataframe again. Otherwise, streaming queries still use the old static dataframe even if you restart the query. That is why, I put join statement again after stopping the streaming query.

Use of Vertical Partitioning Algorithms to Create Column Families in HBase

HBase is a NoSQL database. It stores data as a key-value pair where key is used to uniquely identify a row and each value is further divided into multiple column families. HBase reads subset of columns by reading the entire column families (which has referred columns) into memory and then discards unnecessary columns. To reduce number of column families read in a query, it is very important to define HBase schema carefully. If it is created without considering workload then it might create inefficient layout which can increase the query execution time. There are already many approaches available for HBase schema design but in this post, I will present to use of vertical partitioning algorithms to create HBase column families.

There are two main challenges in defining a table in HBase:

  1. Defining a row key which can cover most of the queries in the workload
  2. Defining column families which help in reading less data as much as possible

Defining Row Key:

For defining row key, I propose to extract all columns which are used in FILTER and rank them according to their usage. Take top 1 or 2 which covers majority of the queries and combine them with primary key of the table. For example, in TPCH benchmark, lineitem table is used in “Q1,Q3,Q4,Q5,Q6,Q7,Q8,Q9,Q10,Q12,Q14,Q15,Q17,Q18,Q19,Q20,Q21” and in Filter, L_SHIPDATE is used 8 times and L_COMMITDATE, L_RECIPETDATE, L_QUANTITY are used for 2 times each. According to my proposal, I choose L_SHIPDATE by combining them with Primary key (L_ORDERKEY , L_LINENUMBER) to make a composite row key for HBase table. By doing this, we can take advantage of key base filtering in 8 queries.

Defining Column Families:

There are already many vertical partitioning algorithms in database world to define vertical partitions. Vertical partitions are similar to column groups and it can be taken as column families in HBase. There is a paper titled as “A Comparison of Knives for Bread Slicing” in VLDB 2013. They compared already existing vertical partitioning algorithms. However, they have not evaluated these algorithms for HBase. It might be interesting to compare there performance in HBase to see how useful these algorithms are in HBase settings.

I have applied five algorithms (AutoPart, HillClimb, O2P, HYRISE, NAVATHE) by considering only Lineitem table and all TPCH queries as workload which involve Lineitem table. The below table is showing the column groups for every algorithm.

Algorithm Name Columns Groups
AutoPart G1:  {L_LINESTATUS, L_TAX}
G2: {L_ORDERKEY}
G3: {L_PARTKEY}
G4: {L_SUPPKEY}
G5: {L_LINENUMBER, L_COMMENT}
G6: {L_QUANTITY}
G7: {L_RECEIPTDATE, L_COMMITDATE}
G8: {L_RETURNFLAG}
G9: {L_SHIPDATE}
G10: {L_DISCOUNT, L_EXTENDEDPRICE}
G11: {L_SHIPINSTRUCT}
G12: {L_SHIPMODE}
HillClimb G1:  {L_ORDERKEY}
G2: {L_PARTKEY}
G3: {L_SUPPKEY}
G4: {L_LINENUMBER}
G5: {L_QUANTITY}
G6: {L_DISCOUNT,L_EXTENDEDPRICE}
G7: {L_LINESTATUS, L_TAX}
G8: {L_RETURNFLAG}
G9: {L_SHIPDATE}
G10: {L_RECEIPTDATE, L_COMMITDATE}
G11: {L_SHIPINSTRUCT}
G12: {L_SHIPMODE}
G13: {L_COMMENT}
HYRISE G1: {L_RECEIPTDATE, L_COMMITDATE}
G2: {L_PARTKEY}
G3: {L_SHIPMODE}
G4: {L_SHIPINSTRUCT}
G5: {L_SHIPDATE}
G6: {L_SUPPKEY}
G7: {L_DISCOUNT}
G8: {L_EXTENDEDPRICE}
G9: {L_RETURNFLAG}
G10: {L_COMMENT, L_LINENUMBER}
G11: {L_LINESTATUS, L_TAX}
G12: {L_ORDERKEY}
G13: {L_QUANTITY}
NAVATHE G1: {L_COMMENT, L_LINESTATUS}
G2: {L_RETURNFLAG}
G3: {L_SHIPMODE}
G4: {L_RECEIPTDATE, L_COMMITDATE}
G5: {L_SUPPKEY}
G6: {L_DISCOUNT, L_EXTENDEDPRICE }
G7: {L_SHIPDATE}
G8: {L_QUANTITY }
G9: {L_PARTKEY}
G10: {L_SHIPINSTRUCT}
G11: {L_TAX}
G12: {L_LINENUMBER}
G13: {L_ORDERKEY}
O2P G1: {L_COMMENT, L_LINESTATUS }
G2: {L_RETURNFLAG}
G3: {L_SHIPMODE }
G4: {L_COMMITDATE }
G5: {L_RECEIPTDATE }
G6: {L_SUPPKEY }
G7: {L_DISCOUNT, L_EXTENDEDPRICE }
G8: {L_SHIPDATE }
G9: {L_QUANTITY L_PARTKEY }
G10: {L_SHIPINSTRUCT }
G11: {L_TAX}
G12: {L_LINENUMBER }
G13: {L_ORDERKEY}

Theoretical Evaluation:

Whenever, we read subset of columns then we have to read all columns families which contain these columns. We can evaluate by seeing how many column families (column groups) that we have to read in different queries.

Query AutoPart HillClimb O2P HYRISE NAVATHE
 Q3  of TPCH

select l_orderkey, sum(l_extendedprice * (1 – l_discount)) as revenue, from LINEITEM where l_shipdate > date ‘1995-03-13’ group by l_orderkey limit 10;

2 2 2 3 2
 Q12  of TPCH

select l_shipmode from LINEITEM where l_shipmode in (‘RAIL’, ‘FOB’) and l_commitdate < l_receiptdate and l_shipdate = date ‘1997-01-01’ and l_receiptdate < date ‘1997-01-01’ + interval ‘1’ year group by l_shipmode order by l_shipmode;
3 3 4 3 3

The above table presents two queries of TPCH and I manually evaluate their performance for different algorithms. I am not favoring any algorithm and it is up to you to decide which one is better in your environment. However, I am recommending to use one of these algorithms while designing schema of a HBase table. It can be helpful and can propose an efficient schema based on your workload.

Actual Evaluation:

I am still working on evaluating these algorithms in HBase. I have created few scripts which I am sharing with you guys. If you get a chance to evaluate them, please evaluate and share the results with me.

I am planning to do evaluation by using Hive with HBase.

First, we have to create lineitem table in Hive as an external table. It will be used to import data in HBase table.

CREATE EXTERNAL TABLE lineitem (
L_ORDERKEY string,
L_PARTKEY string,
L_SUPPKEY string,
L_LINENUMBER string,
L_QUANTITY string,
L_EXTENDEDPRICE string,
L_DISCOUNT string,
L_TAX string,
L_RETURNFLAG string,
L_LINESTATUS string,
L_SHIPDATE string,
L_COMMITDATE string,
L_RECEIPTDATE string,
L_SHIPINSTRUCT string,
L_SHIPMODE string,
L_COMMENT string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’ STORED AS TEXTFILE LOCATION ‘/input/’;

Secondly, we will create HBase table for every algorithm.

create table lineitem_autopart(
key struct,
L_LINESTATUS string, L_TAX string,
L_ORDERKEY string,
L_PARTKEY string,
L_SUPPKEY string,
L_LINENUMBER string, L_COMMENT string,
L_QUANTITY string,
L_RECEIPTDATE string, L_COMMITDATE string,
L_RETURNFLAG string,
L_SHIPDATE string,
L_DISCOUNT string, L_EXTENDEDPRICE string,
L_SHIPINSTRUCT string,
L_SHIPMODE string
)
ROW FORMAT DELIMITED COLLECTION ITEMS TERMINATED BY ‘~’
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (‘hbase.columns.mapping’ = ‘:key,f1:L_LINESTATUS,f1:L_TAX,f2:L_ORDERKEY,f3:L_PARTKEY,f4:L_SUPPKEY,f5:L_LINENUMBER,f5:L_COMMENT,f6:L_QUANTITY,f7:L_RECEIPTDATE,f7:L_COMMITDATE,f8:L_RETURNFLAG,f9:L_SHIPDATE,f10:L_DISCOUNT,f10:L_EXTENDEDPRICE,f11:L_SHIPINSTRUCT,f12:L_SHIPMODE’)
TBLPROPERTIES (‘hbase.table.name’ = ‘lineitem_autopart’);

The above command is created a HBase table by using a composite key and mapping every column to a column family.

Thirdly, we have to load data inside every table of HBase.

FROM lineitem INSERT INTO TABLE lineitem_autopart SELECT named_struct(‘L_SHIPDATE’, L_SHIPDATE, ‘L_ORDERKEY’, L_ORDERKEY,’L_LINENUMBER’,L_LINENUMBER),
L_LINESTATUS, L_TAX,
L_ORDERKEY,
L_PARTKEY,
L_SUPPKEY,
L_LINENUMBER, L_COMMENT,
L_QUANTITY,
L_RECEIPTDATE, L_COMMITDATE,
L_RETURNFLAG,
L_SHIPDATE,
L_DISCOUNT, L_EXTENDEDPRICE,
L_SHIPINSTRUCT,
L_SHIPMODE;

Finally, we can run a query on the loaded tables of HBase

select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from lineitem_autopart where key.l_shipdate <= ‘1998-09-02’ group by l_returnflag,l_linestatus order by l_returnflag, l_linestatus;

In this way, we can evaluate query execution time for every algorithm to compare the effectiveness in HBase.

Query HBase using Apache Drill

Hi guys,

Today, we will see how to query the hbase using apache drill sql interface. Apache drill provides ANSI SQL language for query data from any data source. We will use it to query hbase.

If you don’t have hbase and apache drill on your machine, then follow the below link before going further.

a) https://ranafaisal.wordpress.com/2015/05/13/hbase-insallation-on-ubuntu-14-04/

b) https://ranafaisal.wordpress.com/2015/05/13/install-apache-drill-on-ubuntu-14-04/

 

Now lets start

1) Run apache drill shell “bin/sqlline -u jdbc:drill:zk=local

2) Open WEB UI to enable MongoDB driver, URL of web UI is “http://localhost:8047

3) Now click on storage link

4) In storage , find hbase and click on update

5) Add the following info in the textbox

{
“type”: “hbase”,
“config”: {
“hbase.zookeeper.quorum”: “localhost”,
“hbase.zookeeper.property.clientPort”: “2181”
},
“enabled”: true
}

6) Then click Update button to enable this driver

7) On apache drill shell, run “show databases;“, this will also display the hbase databases too.

8) Now lets query clicks table, “select * from hbase.clicks;“, it will display all clicks information.

9) You need to cast hbase column to varchar to see its correct values, like

SELECT CAST(clicks.row_key as VarChar(20)), CAST(clicks.clickinfo.studentid as VarChar(20)), CAST (clicks.clickinfo.url as VarChar(20)), CAST (clicks.iteminfo.quantity as VarChar(20)), CAST (clicks.iteminfo.itemtype as VarChar(20)) FROM hbase.clicks;

10) Now lets join students and clicks table

select cast(s.account.name as varchar(20)) as name, cast(c.clickinfo.url as varchar(100)) as url from hbase.students as s 

join hbase.clicks as c

on cast(s.row_key as varchar(20)) = cast(c.clickinfo.studentid as varchar(20));

 

Cheers

Query MongoDB using Apache Drill

Hi guys,

Today, we will see how to query the mongodb using apache drill sql interface. Apache drill provides ANSI SQL language for query data from any data source. We will use it to query mongodb.

If you don’t have mongodb and apache drill on your machine, then follow the below link before going further.

a) https://ranafaisal.wordpress.com/2015/05/13/install-mongodb-on-ubuntu-14-04/

b) https://ranafaisal.wordpress.com/2015/05/13/install-apache-drill-on-ubuntu-14-04/

 

Now lets start

1) Run apache drill shell “bin/sqlline -u jdbc:drill:zk=local

2) Open WEB UI to enable MongoDB driver, URL of web UI is “http://localhost:8047

3) Now click on storage link

4) In storage , find mongodb and click on update

5) Add the following info in the textbox

{
“type”: “mongo”,
“connection”: “mongodb://localhost:27017/”,
“enabled”: true
}

6) Then click Update button to enable this driver

7) On apache drill shell, run “show databases;“, this will also display the mongodb databases too.

8) Now lets query zips collection, “select * from mongo.mydb.zips;“, it will display all zips codes save in zips collection.

Cheers

Install Apache Drill on Ubuntu 14.04

Hi guys,

Today, we are going to install Apache Drill. It is a framework which allows to run ad-hoc queries on any data sources. These data sources can be mongodb, hbase, csv file, json file, etc. Lets start with its installation

First, Install Oracle JDK 1.7 on your machine, for this follow this link [https://www.digitalocean.com/community/tutorials/how-to-install-java-on-ubuntu-with-apt-get]

 

1) Download it on ubuntu using “wget http://getdrill.org/drill/download/apache-drill-0.9.0.tar.gz

2) Create directory for its installations “sudo mkdir -p /opt/drill

3) Unzip it into its installation directory “sudo tar -xvzf apache-drill-0.9.0.tar.gz -C /opt/drill

4) Open its directory “cd /opt/drill/apache-drill-0.9.0

5) Run it using “bin/sqlline -u jdbc:drill:zk=local

6) Lets query some json files, download json file [http://media.mongodb.org/zips.json?_ga=1.139282992.2048111731.1429111258] and save it to “/home/yourusername/zips.json

7) Query this file using “SELECT * from dfs.`/home/yourusername/zips.json`

Cheers

Install MongoDB on Ubuntu 14.04

Hi guys,

Today, we will see how to install mongodb on your machine. Please follow the below steps

1) run this command “sudo apt-key adv –keyserver hkp://keyserver.ubuntu.com:80 –recv 7F0CEB10

2) Now run this command “echo “deb http://repo.mongodb.org/apt/ubuntu “$(lsb_release -sc)”/mongodb-org/3.0 multiverse” | sudo tee /etc/apt/sources.list.d/mongodb.list

3) We added the mongodb repository URL to our ubuntu repository links.

4) Now update it using “sudo apt-get update

5) Install mongodb using “sudo apt-get install mongodb-org

6) Start service “sudo service mongod start

7) Congratulations, mongodb is installed, now open the shell using “mongo

8) For shell commands read the following tutorial [http://docs.mongodb.org/manual/tutorial/getting-started-with-the-mongo-shell/]

9) Load dummy data for your experiments, download json file  [http://media.mongodb.org/zips.json?_ga=1.139282992.2048111731.1429111258]

10) Import it using “mongoimport –db mydb –collection zips –file zips.json

Cheers

Hbase Insallation on Ubuntu 14.04

Hi guys,

Today, I am going to install Hbase on my system. I am going to install it on my standalone machine without Hadoop. Lets start

1) Download and install Ubuntu 14.04 on your machine or in virtual machine

2) Install Oracle JDK 1.7 on your machine, for this follow this link [https://www.digitalocean.com/community/tutorials/how-to-install-java-on-ubuntu-with-apt-get]

3) Download tar file of hbase from this link [http://www.apache.org/dyn/closer.cgi/hbase/]

4) unzip it using “tar -xvf hbase-1.0.1-bin.tar.gz

5) Create directory using “sudo mkdir /usr/lib/hbase

6) Move your hbase folder to this directory using “mv hbase-1.0.1 /usr/lib/hbase/hbase-1.0.1

7) In hbase directory you will have hbase-env.sh inside conf directory, open it in any text editor

8) Search “export JAVA_HOME” and change it to this “export JAVA_HOME = /usr/lib/jvm/java-7-oracle“, save this file

9) Now set the hbase path in your enviornment variable using “gedit ~/.bashrc

10)  Add the below lines at the end of .bashrc file and save it

export HBASE_HOME=/usr/lib/hbase/hbase-1.0.1

export PATH=$PATH:$HBASE_HOME/bin

11) run the following command to make these changes effective “. ~/.bashrc

12) Now open conf/hbase-site.xml in text editor and add the below text in it

<?xml version="1.0"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

<property>

<name>hbase.rootdir</name>

<value>file:///home/hduser/HBASE/hbase</value>

</property>

<property>

<name>hbase.zookeeper.property.dataDir</name>

<value>/home/hduser/HBASE/zookeeper</value>

</property>

</configuration>

13) Congratulations, you installed hbase on your system, now start it using "sudo bin/start-hbase.sh"
14) Open hbase shell using "sudo bin/hbase shell"

15) Insert sample data in hbase, please follow this link [https://cwiki.apache.org/confluence/display/DRILL/Querying+HBase]

16) To use hbase shell follow this link [http://akbarahmed.com/2012/08/13/hbase-command-line-tutorial/]

Cheers