Eva's Photo Album
  • Year
    • 2024
      • Europe
      • Eastern Caribbean Cruise
      • US
    • 2023
      • Asia
      • Aruba & Curacao
      • Big Island, HI
      • US
    • 2022
      • Italy
      • French Polynesia
      • Rwanda + Uganda
      • US
    • 2021
      • Bermuda
      • Caucasus
      • US
    • 2020
      • Hong Kong
      • Utah
      • Colorado
      • Europe
      • California + Pacific Northwest, US
    • 2019
      • Guianas
      • Europe
      • Caribbean
      • Asia
      • US
    • 2018
      • Round the World - West Bound
      • Iceland
      • Round the World - East Bound
      • US
    • 2017
      • Guatemala
      • Romania & London
      • Asia
      • Middle East & Germany
      • Weekend happneings
    • 2016
      • Europe
      • Asia
      • Central America
      • Iran
      • US
    • 2015
      • Central America
      • National Park
      • Europe
      • Asia
      • California, US
    • 2014
      • Baltic
      • China
      • California, US
      • Central America
    • 2013
      • Myanmar + Vietnam
      • Central Europe & Balkans Peninsula
      • US + Bolivia
      • Asia
      • China
    • 2012
      • Laos
      • US + Colombia
      • Nepal
      • Asia
      • China
    • 2011
      • Cuba + US
      • Greece
      • Jordan
      • Asia
      • China
    • 2010
      • Indonesia
      • Thailand
      • US Trips
      • Asia
    • 2009
      • California
      • India
      • Life in Asia
      • Middle East + Europe
      • Weekend happneings
    • 2008
      • South America
      • Chile
      • Alaska
      • Weekend Happenings
      • Winter
    • 2007
      • South America
      • Asia
      • Fall
      • Africa
      • Spring
    • 2006
      • Spain + UK
      • Hawaii
      • Belize
      • Summer
      • Winter
    • 2005
      • Tibet
      • Summer
      • Spring
      • Winter
    • 2004
      • Fall
      • Silk Road
      • Summer
      • Spring
      • Winter
    • 2003
      • Fall
      • Europe
      • Spring
    • 2002
      • Europe
      • Summer
      • Winter
    • 2001
      • Fall
      • Summer
      • Spring
    • 2000
      • Asia
      • Fall
      • Spring
      • Winter
    • 1999
      • Fall
      • Summer
      • Spring
      • Winter
    • 1998
      • Hong Kong
      • Fall
      • Summer
      • Spring
      • Winter
    • 1997
      • Fall
      • Summer
  • About Me
  • Contact
  • Follow @tangkk

Data Aggregation like SQL in R, Python Pandas & PySpark DataFrame

October 9, 2018

  • The Data - shopping dataset
  • Group By
  • Order By
  • Union/Append two data frames
  • Intersection of two data frames
  • Join / Merge
    • Inner join
    • Left join
    • Right join
    • Outer join
    • Cross join
  • Advance SQL
    • R’s dplyr vs SQL
    • Example 1 - select a * b as c
    • Example 2 - count
    • Example 3 - count distinct
    • Example 4 - count distinct, group by, having
    • Example 5 - count distinct
    • Example 6 - cumsum, % of total within group by
    • Example 7 - sum(case when else end), group by

The Data - shopping dataset

  • Customer - contains customers detail
  • Product - contains product detail
  • Purchase - contains customers’ purchase detail

In R

customer = data.frame(CustomerId = c(1:6), Name = c("Angela", "Diana", "Cathy", 
    rep("Brandon", 2), "Eric"), State = c("CA", "TX", "IL", "AK", "CA", "AZ"))
print(customer)
##   CustomerId    Name State
## 1          1  Angela    CA
## 2          2   Diana    TX
## 3          3   Cathy    IL
## 4          4 Brandon    AK
## 5          5 Brandon    CA
## 6          6    Eric    AZ
product = data.frame(ProductId = c(1:4), Product = c("iPhone6", "iPhone6", "iWatch", 
    "MacBook"), Size = c(32, 64, NA, 256), Price = c(649, 749, 399, 1299))
print(product)
##   ProductId Product Size Price
## 1         1 iPhone6   32   649
## 2         2 iPhone6   64   749
## 3         3  iWatch   NA   399
## 4         4 MacBook  256  1299
purchase = data.frame(PurchaseId = c(1:7, 7), CustomerId = c(1, 2, 3, 3, 4, 
    4, 6, 6), ProductId = c(rep(1, 3), rep(2, 1), rep(3, 3), 1), Qty = c(1, 
    2, 3, 1, 2, 1, 1, 4))
print(purchase)
##   PurchaseId CustomerId ProductId Qty
## 1          1          1         1   1
## 2          2          2         1   2
## 3          3          3         1   3
## 4          4          3         2   1
## 5          5          4         3   2
## 6          6          4         3   1
## 7          7          6         3   1
## 8          7          6         1   4

In Python

import numpy as np
import pandas as pd
from numpy import nan
CustomerId = range(1,7,1)
Name = ["Angela", "Diana", "Cathy", "Brandon", "Brandon", "Eric"]
State = ["CA", "TX", "IL", "AK", "CA", "AZ"]
customer = pd.DataFrame(np.column_stack([CustomerId, Name, State]), columns=['CustomerId', 'Name', 'State'])
print(customer)
ProductId = range(1,5,1)
Product = ["iPhone6", "iPhone6", "iWatch", "MacBook"]
Size = [32, 64, nan, 256]
Price = [649, 749, 399, 1299]
product = pd.DataFrame(np.column_stack([ProductId, Product, Size, Price]), columns=['ProductId', 'Product', 'Size', 'Price'])
product['ProductId'] = product['ProductId'].astype(int)
product['Size'] = product['Size'].astype(float)
product['Price'] = product['Price'].astype(float)
print(product)
PurchaseId = [1,2,3,4,5,6,7,7]
CustomerId = [1,2,3,3,4,4,6,6]
ProductId = [1,1,1,2,3,3,3,1]
Qty = [1,2,3,1,2,1,1,4]
purchase = pd.DataFrame(np.column_stack([PurchaseId, CustomerId, ProductId, Qty]), columns=['PurchaseId', 'CustomerId', 'ProductId', 'Qty'])
print(purchase)
##   CustomerId     Name State
## 0          1   Angela    CA
## 1          2    Diana    TX
## 2          3    Cathy    IL
## 3          4  Brandon    AK
## 4          5  Brandon    CA
## 5          6     Eric    AZ
##    ProductId  Product   Size   Price
## 0          1  iPhone6   32.0   649.0
## 1          2  iPhone6   64.0   749.0
## 2          3   iWatch    NaN   399.0
## 3          4  MacBook  256.0  1299.0
##    PurchaseId  CustomerId  ProductId  Qty
## 0           1           1          1    1
## 1           2           2          1    2
## 2           3           3          1    3
## 3           4           3          2    1
## 4           5           4          3    2
## 5           6           4          3    1
## 6           7           6          3    1
## 7           7           6          1    4

In PySpark

customerDF = spark.createDataFrame(customer)
customerDF.show()
customerDF.cache()
productDF = spark.createDataFrame(product)
productDF.show()
productDF.cache()
purchaseDF = spark.createDataFrame(purchase)
purchaseDF.show()
purchaseDF.cache()

Group By

Summarize data (count, mean, standard devication, etc) by group

Find the number of models, average and maximum price by product.

In R

Using dplyr

library(dplyr)
group_by(product, Product) %>% summarise(num_model=length(unique(Size)), mean_price=mean(Price), max_price=max(Price))
## # A tibble: 3 x 4
##   Product num_model mean_price max_price
##   <fct>       <int>      <dbl>     <dbl>
## 1 iPhone6         2        699       749
## 2 iWatch          1        399       399
## 3 MacBook         1       1299      1299

Using doBy

doBy calculates the same summary functions for all selected variables or non group by numerical variables

library(doBy)

summaryBy(Size + Price ~ Product, data=product, FUN=list(l=length, avg=mean, max=max))
##   Product Size.length Price.length Size.mean Price.mean Size.max Price.max
## 1 iPhone6           2            2        48        699       64       749
## 2  iWatch           1            1        NA        399       NA       399
## 3 MacBook           1            1       256       1299      256      1299
summaryBy(. ~ Product, data=product, FUN=list(l=length, m=mean, max=max))
##   Product ProductId.length Size.length Price.length ProductId.mean
## 1 iPhone6                2           2            2            1.5
## 2  iWatch                1           1            1            3.0
## 3 MacBook                1           1            1            4.0
##   Size.mean Price.mean ProductId.max Size.max Price.max
## 1        48        699             2       64       749
## 2        NA        399             3       NA       399
## 3       256       1299             4      256      1299

In Python

grouped = product.groupby('Product').agg({'Size':{'num_model':pd.Series.nunique}, 'Price':{'mean_price': 'mean', 'max_price': 'max'}})
print(grouped)
## C:\Users\tangkk\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\groupby\groupby.py:4656: FutureWarning: using a dict with renaming is deprecated and will be removed in a future version
##   return super(DataFrameGroupBy, self).aggregate(arg, *args, **kwargs)
##              Size      Price          
##         num_model mean_price max_price
## Product                               
## MacBook       1.0     1299.0    1299.0
## iPhone6       2.0      699.0     749.0
## iWatch        0.0      399.0     399.0
grouped = product.groupby('Product').agg({'Size':pd.Series.nunique, 'Price':['mean', 'max']})
print(grouped)
##            Size   Price        
##         nunique    mean     max
## Product                        
## MacBook     1.0  1299.0  1299.0
## iPhone6     2.0   699.0   749.0
## iWatch      0.0   399.0   399.0

In PySpark

from pyspark.sql import functions as F
grouped = productDF.groupBy("Product").agg(F.countDistinct("Size").alias("num_model"), F.avg("Price").alias("mean_price"), F.max("Price").alias("max_price"))
grouped.show()

Order By

In R

arrange(customer, Name, desc(State))
##   CustomerId    Name State
## 1          1  Angela    CA
## 2          5 Brandon    CA
## 3          4 Brandon    AK
## 4          3   Cathy    IL
## 5          2   Diana    TX
## 6          6    Eric    AZ

In Python

print(customer.sort_values(by=['Name', 'State'], ascending=[1, 0]))
##   CustomerId     Name State
## 0          1   Angela    CA
## 4          5  Brandon    CA
## 3          4  Brandon    AK
## 2          3    Cathy    IL
## 1          2    Diana    TX
## 5          6     Eric    AZ

In PySpark

customerDF.orderBy(['Name', 'State'], ascending=[1, 0]).show()

Union/Append two data frames

In R

customer2 = data.frame(CustomerId=c(6:8), Name=c("Eric", "Frank", "George"), State=c("AZ" ,"ND", "SC"))

rbind(customer, customer2)
##   CustomerId    Name State
## 1          1  Angela    CA
## 2          2   Diana    TX
## 3          3   Cathy    IL
## 4          4 Brandon    AK
## 5          5 Brandon    CA
## 6          6    Eric    AZ
## 7          6    Eric    AZ
## 8          7   Frank    ND
## 9          8  George    SC

In Python

CustomerId = range(6,9,1)
Name = ["Eric", "Frank", "George"]
State = ["AZ", "ND", "SC"]
customer2 = pd.DataFrame(np.column_stack([CustomerId, Name, State]), columns=['CustomerId', 'Name', 'State'])
pd.concat([customer, customer2])
##   CustomerId     Name State
## 0          1   Angela    CA
## 1          2    Diana    TX
## 2          3    Cathy    IL
## 3          4  Brandon    AK
## 4          5  Brandon    CA
## 5          6     Eric    AZ
## 0          6     Eric    AZ
## 1          7    Frank    ND
## 2          8   George    SC

In PySpark

customer2DF = spark.createDataFrame(customer2)
customerDF.unionAll(customer2DF).collect()

or

sc.parallelize(customerDF.unionAll(customer2DF).collect()).toDF().show()

Intersection of two data frames

In R

merge(customer, customer2, by=colnames(customer2))
##   CustomerId Name State
## 1          6 Eric    AZ

In Python

pd.merge(customer, customer2, how='inner', on=['CustomerId', 'Name', 'State'])
##   CustomerId  Name State
## 0          6  Eric    AZ

In PySpark

print (customerDF.intersect(customer2DF).collect())
sc.parallelize(customerDF.intersect(customer2DF).collect()).toDF().show()

Join / Merge

Inner join

Include the name and state of customer for each purhcase.

In R

inner_join(purchase, customer, by=c("CustomerId"))
##   PurchaseId CustomerId ProductId Qty    Name State
## 1          1          1         1   1  Angela    CA
## 2          2          2         1   2   Diana    TX
## 3          3          3         1   3   Cathy    IL
## 4          4          3         2   1   Cathy    IL
## 5          5          4         3   2 Brandon    AK
## 6          6          4         3   1 Brandon    AK
## 7          7          6         3   1    Eric    AZ
## 8          7          6         1   4    Eric    AZ

In Python

print(pd.merge(customer, customer2, how='inner', on=['CustomerId', 'Name', 'State']))
##   CustomerId  Name State
## 0          6  Eric    AZ

In PySpark

customerDF.intersect(customer2DF).collect()

Left join

List all customers and their purchases.

In R

left_join(customer, purchase, by.x="CustomerId", by.y="CustomerId")
##   CustomerId    Name State PurchaseId ProductId Qty
## 1          1  Angela    CA          1         1   1
## 2          2   Diana    TX          2         1   2
## 3          3   Cathy    IL          3         1   3
## 4          3   Cathy    IL          4         2   1
## 5          4 Brandon    AK          5         3   2
## 6          4 Brandon    AK          6         3   1
## 7          5 Brandon    CA         NA        NA  NA
## 8          6    Eric    AZ          7         3   1
## 9          6    Eric    AZ          7         1   4

In Python

customer['CustomerId'] = customer['CustomerId'].astype(int)
purchase['CustomerId'] = purchase['CustomerId'].astype(int)
print(pd.merge(customer, purchase, how='left', on=['CustomerId']))
##    CustomerId     Name State  PurchaseId  ProductId  Qty
## 0           1   Angela    CA         1.0        1.0  1.0
## 1           2    Diana    TX         2.0        1.0  2.0
## 2           3    Cathy    IL         3.0        1.0  3.0
## 3           3    Cathy    IL         4.0        2.0  1.0
## 4           4  Brandon    AK         5.0        3.0  2.0
## 5           4  Brandon    AK         6.0        3.0  1.0
## 6           5  Brandon    CA         NaN        NaN  NaN
## 7           6     Eric    AZ         7.0        3.0  1.0
## 8           6     Eric    AZ         7.0        1.0  4.0

In PySpark

purchaseDF = purchaseDF.withColumnRenamed('CustomerId', 'CustID')
result = customerDF.join(purchaseDF, (customerDF.CustomerId == purchaseDF.CustID), 'left')
result.orderBy('CustomerId').show(10)

Right join

List all products and purchases including them.

In R

right_join(purchase, product, by=c("ProductId"))
##   PurchaseId CustomerId ProductId Qty Product Size Price
## 1          1          1         1   1 iPhone6   32   649
## 2          2          2         1   2 iPhone6   32   649
## 3          3          3         1   3 iPhone6   32   649
## 4          7          6         1   4 iPhone6   32   649
## 5          4          3         2   1 iPhone6   64   749
## 6          5          4         3   2  iWatch   NA   399
## 7          6          4         3   1  iWatch   NA   399
## 8          7          6         3   1  iWatch   NA   399
## 9         NA         NA         4  NA MacBook  256  1299

In Python

product['ProductId'] = product['ProductId'].astype(int)
purchase['ProductId'] = purchase['ProductId'].astype(int)
print(pd.merge(purchase, product, how='right', on='ProductId'))
##    PurchaseId  CustomerId  ProductId  Qty  Product   Size Price
## 0         1.0         1.0          1  1.0  iPhone6   32.0   649
## 1         2.0         2.0          1  2.0  iPhone6   32.0   649
## 2         3.0         3.0          1  3.0  iPhone6   32.0   649
## 3         7.0         6.0          1  4.0  iPhone6   32.0   649
## 4         4.0         3.0          2  1.0  iPhone6   64.0   749
## 5         5.0         4.0          3  2.0   iWatch    nan   399
## 6         6.0         4.0          3  1.0   iWatch    nan   399
## 7         7.0         6.0          3  1.0   iWatch    nan   399
## 8         NaN         NaN          4  NaN  MacBook  256.0  1299

In PySpark

productDF = productDF.withColumnRenamed('ProductId', 'ProdId')
result = purchaseDF.join(productDF, (purchaseDF.ProductId == productDF.ProdId), 'right')
result.select('ProdId', 'PurchaseId', 'CustID', 'Qty', 'Product', 'Size', 'Price').orderBy('ProdId').show(10)

Outer join

In R

full_join(customer, purchase)
##   CustomerId    Name State PurchaseId ProductId Qty
## 1          1  Angela    CA          1         1   1
## 2          2   Diana    TX          2         1   2
## 3          3   Cathy    IL          3         1   3
## 4          3   Cathy    IL          4         2   1
## 5          4 Brandon    AK          5         3   2
## 6          4 Brandon    AK          6         3   1
## 7          5 Brandon    CA         NA        NA  NA
## 8          6    Eric    AZ          7         3   1
## 9          6    Eric    AZ          7         1   4

In Python

print(pd.merge(customer, purchase, how='outer', on=['CustomerId']))
##    CustomerId     Name State  PurchaseId  ProductId  Qty
## 0           1   Angela    CA         1.0        1.0  1.0
## 1           2    Diana    TX         2.0        1.0  2.0
## 2           3    Cathy    IL         3.0        1.0  3.0
## 3           3    Cathy    IL         4.0        2.0  1.0
## 4           4  Brandon    AK         5.0        3.0  2.0
## 5           4  Brandon    AK         6.0        3.0  1.0
## 6           5  Brandon    CA         NaN        NaN  NaN
## 7           6     Eric    AZ         7.0        3.0  1.0
## 8           6     Eric    AZ         7.0        1.0  4.0

In PySpark

result = customerDF.join(purchaseDF, (customerDF.CustomerId == purchaseDF.CustID), 'outer')
result.select('CustomerId', 'Name', 'State', 'PurchaseId', 'ProductId', 'Qty').orderBy('CustomerId').show()

Cross join

In R

merge(product, purchase, by=NULL)
##    ProductId.x Product Size Price PurchaseId CustomerId ProductId.y Qty
## 1            1 iPhone6   32   649          1          1           1   1
## 2            2 iPhone6   64   749          1          1           1   1
## 3            3  iWatch   NA   399          1          1           1   1
## 4            4 MacBook  256  1299          1          1           1   1
## 5            1 iPhone6   32   649          2          2           1   2
## 6            2 iPhone6   64   749          2          2           1   2
## 7            3  iWatch   NA   399          2          2           1   2
## 8            4 MacBook  256  1299          2          2           1   2
## 9            1 iPhone6   32   649          3          3           1   3
## 10           2 iPhone6   64   749          3          3           1   3
## 11           3  iWatch   NA   399          3          3           1   3
## 12           4 MacBook  256  1299          3          3           1   3
## 13           1 iPhone6   32   649          4          3           2   1
## 14           2 iPhone6   64   749          4          3           2   1
## 15           3  iWatch   NA   399          4          3           2   1
## 16           4 MacBook  256  1299          4          3           2   1
## 17           1 iPhone6   32   649          5          4           3   2
## 18           2 iPhone6   64   749          5          4           3   2
## 19           3  iWatch   NA   399          5          4           3   2
## 20           4 MacBook  256  1299          5          4           3   2
## 21           1 iPhone6   32   649          6          4           3   1
## 22           2 iPhone6   64   749          6          4           3   1
## 23           3  iWatch   NA   399          6          4           3   1
## 24           4 MacBook  256  1299          6          4           3   1
## 25           1 iPhone6   32   649          7          6           3   1
## 26           2 iPhone6   64   749          7          6           3   1
## 27           3  iWatch   NA   399          7          6           3   1
## 28           4 MacBook  256  1299          7          6           3   1
## 29           1 iPhone6   32   649          7          6           1   4
## 30           2 iPhone6   64   749          7          6           1   4
## 31           3  iWatch   NA   399          7          6           1   4
## 32           4 MacBook  256  1299          7          6           1   4

No cross join function in Python

In PySpark

spark.conf.set("spark.sql.crossJoin.enabled", True), 
productDF.join(purchaseDF).show()

Advance SQL

R’s dplyr vs SQL

Advance SQL is make easy using dplyr with its %>% and simple “verbs”/functions to chain together multiple simple steps to achieve a complex result.

dplyr SQL
select(df, column, column:column, …) select
select(df, col1=column, …) select column as col1
slice(df, position:position) select top position
distinct(select(df, column, …) select distinct column
n_distinct(df, column) select count(distinct column)
count(df, column) select count(column)
mutate(df, col3 = col1 + col2, …) select col1, col2, col1 + col2 as col3
transform(df, col3 = col1 + col2, col4 = col1 + col3, …) select col1, col2, col1 + col2 as col3, col1 + col3 as col4
transmute(df, col3 = col1 + col2, col4 = col1 + col3, …) select col1 + col2 as col3, col1 + col3 as col4
filter(df, expression operator expression, …) where
inner_join(df1, df2, by=col1) select * from df1 join df2 where df1.col1 = df2.col1
left_join(df1, df2, by.x=“x”, by.y=“y”) select * from df1 left join df2 where df1.x = df2.y
right_join(df1, df2, by=c(“x”, “y”)) select * from df1 right join df2 where df1.x = df2.y
full_join(df1, df2) select * from df1 outer join df2
semi_join(df1, df2) select df1.* from df1 join df2
anti_join(df1, df2) select df1.* from df1 left join df2 where df2 is null
group_by(df, ) group by
arrange(df, column, desc(column), …) order by
summarise(df, avg=mean(col1), max=max(col2)) select mean(col1) avg, max(col2) max
summarise(n(), n_distinct(column)) select count(*), count(distinct column) as d
union(df1, df2) union
intersect(df1, df2) intersect
setdiff(df1, df2) except
sample_n(df, sample_size, replace=TRUE) …
sample_frac(df, fraction, replace=FALSE) where RAND() < fraction

Example 1 - select a * b as c

Add a variable Cost which is Purchase.Qty * Product.Price for each purchased item.

In R

library(dplyr)

inner_join(product, purchase, by="ProductId") %>% transform(cost=Qty * Price)
##   ProductId Product Size Price PurchaseId CustomerId Qty cost
## 1         1 iPhone6   32   649          1          1   1  649
## 2         1 iPhone6   32   649          2          2   2 1298
## 3         1 iPhone6   32   649          3          3   3 1947
## 4         1 iPhone6   32   649          7          6   4 2596
## 5         2 iPhone6   64   749          4          3   1  749
## 6         3  iWatch   NA   399          5          4   2  798
## 7         3  iWatch   NA   399          6          4   1  399
## 8         3  iWatch   NA   399          7          6   1  399

In Python

joined = pd.merge(product, purchase, how='inner', on='ProductId')
joined['cost'] = joined['Qty'] * joined['Price']
print(joined)
##    ProductId  Product  Size  Price  PurchaseId  CustomerId  Qty    cost
## 0          1  iPhone6  32.0  649.0           1           1    1   649.0
## 1          1  iPhone6  32.0  649.0           2           2    2  1298.0
## 2          1  iPhone6  32.0  649.0           3           3    3  1947.0
## 3          1  iPhone6  32.0  649.0           7           6    4  2596.0
## 4          2  iPhone6  64.0  749.0           4           3    1   749.0
## 5          3   iWatch   NaN  399.0           5           4    2   798.0
## 6          3   iWatch   NaN  399.0           6           4    1   399.0
## 7          3   iWatch   NaN  399.0           7           6    1   399.0

In PySpark

productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
resultDF = productDF.join(purchaseDF, (productDF.ProductId == purchaseDF.ProductId), 'inner')
resultDF.withColumn('cost', resultDF.Qty * resultDF.Price).show()

Example 2 - count

Find the number of items and total due for each purchase order by Total_cost desc

In R

inner_join(product, purchase, by="ProductId") %>% group_by(PurchaseId) %>% summarise(Num_item=n(), Total_cost=sum(Qty*Price)) %>% arrange(desc(Total_cost))
## # A tibble: 7 x 3
##   PurchaseId Num_item Total_cost
##        <dbl>    <int>      <dbl>
## 1          7        2       2995
## 2          3        1       1947
## 3          2        1       1298
## 4          5        1        798
## 5          4        1        749
## 6          1        1        649
## 7          6        1        399

In Python

joined = pd.merge(product, purchase, how='inner', on='ProductId')
joined['total_cost'] = joined['Qty'] * joined['Price']
grouped = joined.groupby('PurchaseId').agg({'Size':'count', 'total_cost':'sum'}).reset_index()
print(grouped.sort_values(by='total_cost', ascending=0))
##    PurchaseId  Size  total_cost
## 6           7     1      2995.0
## 2           3     1      1947.0
## 1           2     1      1298.0
## 4           5     0       798.0
## 3           4     1       749.0
## 0           1     1       649.0
## 5           6     0       399.0

In PySpark

productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
joined = productDF.join(purchaseDF, (productDF.ProductId == purchaseDF.ProductId), 'inner')
joined = joined.withColumn('total_cost', joined.Qty * joined.Price)
from pyspark.sql import functions as F
grouped = joined.groupBy("PurchaseId").agg(F.count('Size').alias("Num_item"), F.sum("total_cost").alias("Total_cost"))
grouped.orderBy('Total_cost', ascending=0).show()

Example 3 - count distinct

Find how many purchase made, number of unique product purchased and amount spent for each customer order by customer’s State and Name

In R

customer$Name <- as.character(customer$Name)
customer$State <- as.character(customer$State)

left_join(customer, purchase, by="CustomerId") %>% left_join(product, by="ProductId") %>% group_by(State, Name, CustomerId) %>% summarise(Num_purchase=n_distinct(PurchaseId), Num_product=sum(n_distinct(ProductId)), Total_amount=sum(Qty * Price)) %>% arrange(State)
## # A tibble: 6 x 6
## # Groups:   State, Name [6]
##   State Name    CustomerId Num_purchase Num_product Total_amount
##   <chr> <chr>        <dbl>        <int>       <int>        <dbl>
## 1 AK    Brandon          4            2           1         1197
## 2 AZ    Eric             6            1           2         2995
## 3 CA    Angela           1            1           1          649
## 4 CA    Brandon          5            1           1           NA
## 5 IL    Cathy            3            2           2         2696
## 6 TX    Diana            2            1           1         1298

In Python

joined = pd.merge(customer, purchase, how='left', on='CustomerId')
joined = pd.merge(joined, product, how='left', on='ProductId')
joined['total_cost'] = joined['Qty'] * joined['Price']
grouped = joined.groupby(['CustomerId', 'Name', 'State']).agg({'PurchaseId':'nunique', 'ProductId':'nunique', 'total_cost':'sum'}).reset_index()
grouped = grouped.rename(columns = {'PurchaseId': 'Num_purchase', 'ProductId': 'Num_product', 'total_cost':'Total_amount'})
print(grouped[['CustomerId', 'State', 'Name', 'Num_purchase', 'Num_product', 'Total_amount']].sort_values(by=['State', 'Name']))
##    CustomerId State     Name  Num_purchase  Num_product  Total_amount
## 3           4    AK  Brandon             2            1        1197.0
## 5           6    AZ     Eric             1            2        2995.0
## 0           1    CA   Angela             1            1         649.0
## 4           5    CA  Brandon             0            0           0.0
## 2           3    IL    Cathy             2            2        2696.0
## 1           2    TX    Diana             1            1        1298.0

In PySpark

customerDF = spark.createDataFrame(customer)
productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
purchaseDF = purchaseDF.withColumnRenamed("CustomerId", "CustId").withColumnRenamed("ProductId", "ProdId")
joined = customerDF.join(purchaseDF, (customerDF.CustomerId == purchaseDF.CustId), 'left')
joined = joined.join(productDF, (joined.ProdId == productDF.ProductId), 'left')
joined = joined.withColumn('total_cost', joined.Qty * joined.Price)
from pyspark.sql import functions as F
grouped = sc.parallelize(joined.groupBy(['CustomerId', 'Name', 'State']).agg(F.countDistinct('PurchaseId').alias("Num_purchase"), F.countDistinct('ProductId').alias("Num_product"), F.sum("total_cost").alias("Total_amount")).collect()).toDF()
grouped.orderBy(['State', 'Name']).show()

Example 4 - count distinct, group by, having

Find how many purchase made, number of unique product purchased and total amount spent for customer who spent more than $1000 order by Total_amount desc

In R

customer$State <- as.character(customer$State)

left_join(customer, purchase, by="CustomerId") %>% left_join(product, by="ProductId") %>% group_by(CustomerId) %>% summarise(State=min(State), Name=min(Name), Num_purchase=n_distinct(PurchaseId), Num_product=sum(n_distinct(ProductId)), Total_amount=sum(Qty * Price)) %>% filter(Total_amount > 1000) %>% arrange(desc(Total_amount))
## # A tibble: 4 x 6
##   CustomerId State Name    Num_purchase Num_product Total_amount
##        <dbl> <chr> <chr>          <int>       <int>        <dbl>
## 1          6 AZ    Eric               1           2         2995
## 2          3 IL    Cathy              2           2         2696
## 3          2 TX    Diana              1           1         1298
## 4          4 AK    Brandon            2           1         1197

In Python

joined = pd.merge(customer, purchase, how='left', on='CustomerId')
joined = pd.merge(joined, product, how='left', on='ProductId')
joined['total_cost'] = joined['Qty'] * joined['Price']
grouped = joined.groupby(['CustomerId', 'Name', 'State']).agg({'PurchaseId':'nunique', 'ProductId':'nunique', 'total_cost':'sum'}).reset_index()
grouped = grouped.rename(columns = {'PurchaseId': 'Num_purchase', 'ProductId': 'Num_product', 'total_cost':'Total_amount'})
grouped = grouped[grouped['Total_amount'] > 1000]
print(grouped[['CustomerId', 'State', 'Name', 'Num_purchase', 'Num_product', 'Total_amount']].sort_values(by=['Total_amount'], ascending=0))
##    CustomerId State     Name  Num_purchase  Num_product  Total_amount
## 5           6    AZ     Eric             1            2        2995.0
## 2           3    IL    Cathy             2            2        2696.0
## 1           2    TX    Diana             1            1        1298.0
## 3           4    AK  Brandon             2            1        1197.0

In PySpark

customerDF = spark.createDataFrame(customer)
productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
purchaseDF = purchaseDF.withColumnRenamed("CustomerId", "CustId").withColumnRenamed("ProductId", "ProdId")
joined = customerDF.join(purchaseDF, (customerDF.CustomerId == purchaseDF.CustId), 'left')
joined = joined.join(productDF, (joined.ProdId == productDF.ProductId), 'left')
joined = joined.withColumn('total_cost', joined.Qty * joined.Price)
from pyspark.sql import functions as F
grouped = joined.groupBy(['CustomerId', 'Name', 'State']).agg(F.countDistinct('PurchaseId').alias("Num_purchase"), F.countDistinct('ProductId').alias("Num_product"), F.sum("total_cost").alias("Total_amount"))
grouped.filter(grouped.Total_amount > 1000).orderBy('Total_amount', ascending=0).show()

Example 5 - count distinct

For customers purhcased iPhone6 find how many purchase made, number of unique product purchased and total amount spent for customer order total quantity desc

In R

filter(product, Product=="iPhone6") %>% inner_join(purchase, by="ProductId", "CustomerId") %>% select(CustomerId) %>% distinct(CustomerId) %>% inner_join(purchase, by="CustomerId") %>% inner_join(product, by="ProductId") %>% group_by(CustomerId) %>% summarise(Num_purchase=n_distinct(PurchaseId), Num_product=sum(n_distinct(ProductId)), Total_amount=sum(Qty * Price), Total_Qty=sum(Qty)) %>% arrange(desc(Total_Qty))
## # A tibble: 4 x 5
##   CustomerId Num_purchase Num_product Total_amount Total_Qty
##        <dbl>        <int>       <int>        <dbl>     <dbl>
## 1          6            1           2         2995         5
## 2          3            2           2         2696         4
## 3          2            1           1         1298         2
## 4          1            1           1          649         1

In Python

iPhone6_Id = product[product['Product'] == 'iPhone6']
joined = pd.merge(purchase, iPhone6_Id, how='inner', on='ProductId')
iPhone6_CustID = pd.DataFrame(joined.CustomerId.unique(), columns=['CustomerId'])
customer['CustomerId'] = customer['CustomerId'].astype(int)
iPhone6_Cust_Purchase = pd.merge(purchase, iPhone6_CustID, how='inner', on='CustomerId')
iPhone6_Cust_Product = pd.merge(iPhone6_Cust_Purchase, product, how='inner', on='ProductId')
iPhone6_Cust_Product['total_cost'] = iPhone6_Cust_Product['Qty'] * iPhone6_Cust_Product['Price']
grouped = iPhone6_Cust_Product.groupby(['CustomerId']).agg({'PurchaseId':'nunique', 'ProductId':'nunique', 'Qty':'sum', 'total_cost':'sum'}).reset_index()
grouped = grouped.rename(columns = {'PurchaseId': 'Num_purchase', 'ProductId': 'Num_product', 'Qty': 'Total_Qty', 'total_cost':'Total_amount'})
print(grouped[['CustomerId', 'Num_purchase', 'Num_product', 'Total_Qty', 'Total_amount']].sort_values(by=['Total_Qty'], ascending=0))
##    CustomerId  Num_purchase  Num_product  Total_Qty  Total_amount
## 3           6             1            2          5        2995.0
## 2           3             2            2          4        2696.0
## 1           2             1            1          2        1298.0
## 0           1             1            1          1         649.0

In PySpark

customerDF = spark.createDataFrame(customer)
productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
purchaseDF = purchaseDF.withColumnRenamed("ProductId", "ProdId")
iphone6 = sc.parallelize(productDF.filter(productDF.Product == 'iPhone6').collect()).toDF()
iphone6_custID = purchaseDF.join(iphone6, (purchaseDF.ProdId == iphone6.ProductId), 'inner').select('CustomerId').distinct().withColumnRenamed("CustomerId", "CustId")
iphone6_cust_purchase = purchaseDF.join(iphone6_custID, (purchaseDF.CustomerId == iphone6_custID.CustId), 'inner')
iPhone6_Cust_Product = productDF.join(iphone6_cust_purchase, (productDF.ProductId == iphone6_cust_purchase.ProdId), 'inner')
iPhone6_Cust_Product = iPhone6_Cust_Product.withColumn('total_cost', iPhone6_Cust_Product.Qty * iPhone6_Cust_Product.Price)
from pyspark.sql import functions as F
grouped = iPhone6_Cust_Product.groupBy(['CustomerId']).agg(F.countDistinct('PurchaseId').alias("Num_purchase"), F.countDistinct('ProductId').alias("Num_product"), F.sum('Qty').alias("Total_Qty"), F.sum("total_cost").alias("Total_amount"))
grouped.orderBy('Total_Qty', ascending=0).show()

Example 6 - cumsum, % of total within group by

For each customers list their purchase (PurchaseId level) and associated cummulative revenue order by customer state, name

In R

inner_join(purchase, product, by="ProductId") %>% transform(total=Qty*Price) %>% group_by(PurchaseId, CustomerId) %>% summarise(rev=sum(total)) %>% group_by(CustomerId) %>% mutate(cumrev=cumsum(rev)) %>% ungroup() %>% inner_join(customer) %>% arrange(State, Name, PurchaseId)
## # A tibble: 7 x 6
##   PurchaseId CustomerId   rev cumrev Name    State
##        <dbl>      <dbl> <dbl>  <dbl> <chr>   <chr>
## 1          5          4   798    798 Brandon AK   
## 2          6          4   399   1197 Brandon AK   
## 3          7          6  2995   2995 Eric    AZ   
## 4          1          1   649    649 Angela  CA   
## 5          3          3  1947   1947 Cathy   IL   
## 6          4          3   749   2696 Cathy   IL   
## 7          2          2  1298   1298 Diana   TX

In Python

joined = pd.merge(product, purchase, how='inner', on='ProductId')
joined['total_cost'] = joined['Qty'] * joined['Price']
grouped = joined.groupby(['CustomerId', 'PurchaseId']).agg({'total_cost':'sum'})#.reset_index()
grouped['cumrev'] = grouped.groupby(level=0)['total_cost'].apply(lambda x: x.cumsum())
grouped['pct'] = grouped.groupby(level=['CustomerId'])['total_cost'].apply(lambda x: 100 * x / float(x.sum()))
grouped = grouped.reset_index()
grouped.columns = ['CustomerId', 'PurchaseId', 'rev', 'cumrev', '%_cust_rev']
result = pd.merge(grouped, customer, how='inner', on='CustomerId')
print(result.sort_values(by=['State', 'Name', 'PurchaseId']))
##    CustomerId  PurchaseId     rev  cumrev  %_cust_rev     Name State
## 4           4           5   798.0   798.0   66.666667  Brandon    AK
## 5           4           6   399.0  1197.0   33.333333  Brandon    AK
## 6           6           7  2995.0  2995.0  100.000000     Eric    AZ
## 0           1           1   649.0   649.0  100.000000   Angela    CA
## 2           3           3  1947.0  1947.0   72.218101    Cathy    IL
## 3           3           4   749.0  2696.0   27.781899    Cathy    IL
## 1           2           2  1298.0  1298.0  100.000000    Diana    TX

In PySpark

customerDF = spark.createDataFrame(customer)
productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
purchaseDF = purchaseDF.withColumnRenamed("ProductId", "ProdId").withColumnRenamed("CustomerId", "CustId")
joined = productDF.join(purchaseDF, productDF.ProductId == purchaseDF.ProdId, 'inner')
joined = joined.withColumn('total_cost', joined.Qty * joined.Price)
grouped = joined.groupBy(['PurchaseId', 'CustId']).agg(F.sum('total_cost').alias("Rev"))
from pyspark.sql.window import Window
import pyspark.sql.functions as F
myPartition = Window.partitionBy(grouped['CustId']).orderBy(grouped['PurchaseId'])
grouped_cum = grouped.withColumn('CumRev', F.sum(grouped.Rev).over(myPartition)).orderBy(['CustId', 'PurchaseId'])
cust_cum = grouped_cum.join(customerDF, (grouped_cum.CustId == customerDF.CustomerId), 'inner')
cust_cum.orderBy(['State', 'Name', 'PurchaseId']).show()

Example 7 - sum(case when else end), group by

Number of iPhone sold, % of which are 32GB and 64GB

In R

inner_join(purchase, product, by="ProductId") %>% filter(Product=="iPhone6") %>% summarise(iPhone6_sold=sum(Qty), GB32=round(sum(Qty[Size==32])/sum(Qty)*100,2), GB64=round(sum(Qty[Size==64]/sum(Qty))*100,2))
##   iPhone6_sold  GB32 GB64
## 1           11 90.91 9.09

In Python

iPhone6_Id = product[product['Product'].isin(['iPhone6'])]
joined = pd.merge(purchase, iPhone6_Id, how='inner', on='ProductId')
print(joined.groupby('Product').apply(lambda x: pd.Series(dict(sold=x.Qty.sum(), GB32=x[x['Size'] == 32].Qty.sum()/float(x.Qty.sum())*100, GB64=x[x['Size'] == 64].Qty.sum()/float(x.Qty.sum())*100))))
##          sold       GB32      GB64
## Product                           
## iPhone6  11.0  90.909091  9.090909

or

iPhone6_Id = product[product['Product'].isin(['iPhone6'])]
joined = pd.merge(purchase, iPhone6_Id, how='inner', on='ProductId')
print("iPhone6_sold:", (joined.groupby(['Product']).Qty.sum())[0])
print("iPhone6_sold by size in percentage:")
joined.groupby(['Product', 'Size']).Qty.sum() / joined.groupby(['Product']).Qty.sum() * 100
## iPhone6_sold: 11
## iPhone6_sold by size in percentage:
## Product  Size
## iPhone6  32.0    90.909091
##          64.0     9.090909
## Name: Qty, dtype: float64

In PySpark

productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
purchaseDF = purchaseDF.withColumnRenamed("ProductId", "ProdId").withColumnRenamed("CustomerId", "CustId")
iphone6 = productDF.filter(productDF.Product.isin(['iPhone6']))
joined = iphone6.join(purchaseDF, iphone6.ProductId == purchaseDF.ProdId, 'inner')
import pyspark.sql.functions as F
grouped = joined.groupby('Product').agg(F.sum(joined['Qty']).alias('sold'), (F.sum(F.when(joined['Size']==32, joined['Qty']).otherwise(0))/F.sum(joined['Qty'])*100).alias('GB32'), (F.sum(F.when(joined['Size']==64, joined['Qty']).otherwise(0))/F.sum(joined['Qty'])*100).alias('GB64'))
grouped.show()