Predicting the age of abalone with Linear Regression
Background
Predicting the age of abalone from physical measurements. The age of abalone is determined by cutting the shell through the cone, staining it, and counting the number of rings through a microscope – a boring and time-consuming task. Other measurements, which are easier to obtain, are used to predict the age.
Problem
Build regression models by sex which can predict the number of rings.
Fetching data from the Internet
The training data is located on the website. Thanks for Greenplum’s external table, accessing CSV data from a HTTP server is easy.
First, connect to the database. You may want to change the connect string like postgresql://<user>@<host>:<port>/<db_name>
to fit your needs.
[ ]:
%load_ext sql
%sql postgresql://localhost/gpadmin
Then, create the external table:
[2]:
%%sql
-- External Table
DROP EXTERNAL TABLE IF EXISTS abalone_external;
CREATE EXTERNAL WEB TABLE abalone_external(
sex text
, length float8
, diameter float8
, height float8
, whole_weight float8
, shucked_weight float8
, viscera_weight float8
, shell_weight float8
, rings integer -- target variable to predict
) EXECUTE 'curl http://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.data'
format 'CSV'
(null as '?');
* postgresql://localhost/gpadmin
Done.
Done.
[2]:
[]
[3]:
%%sql
-- Create abalone table from an external table
DROP TABLE IF EXISTS abalone;
CREATE TABLE abalone AS (
SELECT ROW_NUMBER() OVER() AS id, *
FROM abalone_external
) DISTRIBUTED BY (sex);
* postgresql://localhost/gpadmin
Done.
12531 rows affected.
[3]:
[]
Train-Test Set Split
Before proceeding data exploration, let’s split our dataset to train and test set.
Firstly, we fetch a random value between 0 and 1 to each row;
Then we create a percentile table that stores percentile values for each sex;
Finally, we join those 2 tables to obtain our training or test tables.
But since Ordered-Set Aggregate Function is not yet supported by the current version, we will skip this step with GreenplumPython and implement it with SQL.
[4]:
%%sql
CREATE TEMP TABLE temp_abalone_label AS
(SELECT *, random() AS __samp_out_label FROM abalone);
CREATE TEMP TABLE train_percentile_disc AS
(SELECT sex, percentile_disc(0.8) within GROUP (ORDER BY __samp_out_label) AS __samp_out_label
FROM temp_abalone_label GROUP BY sex);
CREATE TEMP TABLE test_percentile_disc AS
(SELECT sex, percentile_disc(0.2) within GROUP (ORDER BY __samp_out_label) AS __samp_out_label
FROM temp_abalone_label GROUP BY sex);
DROP TABLE IF EXISTS abalone_train;
CREATE TABLE abalone_train AS
(SELECT temp_abalone_label.*
FROM temp_abalone_label
INNER JOIN train_percentile_disc
ON temp_abalone_label.__samp_out_label <= train_percentile_disc.__samp_out_label
AND temp_abalone_label.sex = train_percentile_disc.sex
);
DROP TABLE IF EXISTS abalone_test;
CREATE TABLE abalone_test AS
(SELECT temp_abalone_label.*
FROM temp_abalone_label
INNER JOIN test_percentile_disc
ON temp_abalone_label.__samp_out_label <= test_percentile_disc.__samp_out_label
AND temp_abalone_label.sex = test_percentile_disc.sex
)
* postgresql://localhost/gpadmin
12531 rows affected.
3 rows affected.
3 rows affected.
Done.
10026 rows affected.
Done.
2508 rows affected.
[4]:
[]
Note that these features could be supported by GreenplumPython in future release.
Import preparation
Connect to Greenplum database named gpadmin
[5]:
import greenplumpython as gp
[6]:
db = gp.database("postgresql://localhost/gpadmin")
Data Exploration
Get access to the existing table abalone
:
[7]:
abalone = db.create_dataframe(table_name="abalone")
Inspect the table:
[8]:
# SELECT * FROM abalone ORDER BY id LIMIT 5;
abalone.order_by("id")[:5]
[8]:
id | sex | length | diameter | height | whole_weight | shucked_weight | viscera_weight | shell_weight | rings |
---|---|---|---|---|---|---|---|---|---|
1 | M | 0.455 | 0.365 | 0.095 | 0.514 | 0.2245 | 0.101 | 0.15 | 15 |
2 | M | 0.35 | 0.265 | 0.09 | 0.2255 | 0.0995 | 0.0485 | 0.07 | 7 |
3 | F | 0.53 | 0.42 | 0.135 | 0.677 | 0.2565 | 0.1415 | 0.21 | 9 |
4 | M | 0.44 | 0.365 | 0.125 | 0.516 | 0.2155 | 0.114 | 0.155 | 10 |
5 | I | 0.33 | 0.255 | 0.08 | 0.205 | 0.0895 | 0.0395 | 0.055 | 7 |
Observe the distribution of data on different segments:
[9]:
# SELECT gp_execution_segment() AS gp_segment_id, COUNT(*)
# FROM abalone
# GROUP BY 1;
import greenplumpython.builtins.functions as F
abalone.assign(gp_segment_id=lambda _: gp.function("gp_execution_segment")()).group_by(
"gp_segment_id"
).apply(lambda _: F.count())
[9]:
count | gp_segment_id |
---|---|
3921 | 2 |
8610 | 1 |
Since we already have table abalone_train
ad abalone_test
in the database, we can get access to them.
[10]:
abalone_train = db.create_dataframe(table_name="abalone_train")
abalone_test = db.create_dataframe(table_name="abalone_test")
Learning to Make Predictions
Training Model with Linear Regression
Creation of training function
NOTE: Since the linreg_func
will be executed on Greenplum Database, all the python dependencies (scikit-learn
/numpy
/etc.) needs to be discoverable by the plpython
extension. Please check Greenplum PL/Python Language for more information.
[11]:
from typing import List
# CREATE TYPE linreg_type AS (
# col_nm text[]
# , coef float8[]
# , intercept float8
# , serialized_linreg_model bytea
# , created_dt text
# );
import dataclasses
import datetime
@dataclasses.dataclass
class LinregType:
col_nm: List[str]
coef: List[float]
intercept: float
serialized_linreg_model: bytes
created_dt: str
# -- Create function
# -- Need to specify the return type -> API will create the corresponding type in Greenplum to return a row
# -- Will add argument to change language extensions, currently plpython3u by default
from sklearn.linear_model import LinearRegression
import numpy as np
import pickle
@gp.create_column_function
def linreg_func(length: List[float], shucked_weight: List[float], rings: List[int]) -> LinregType:
X = np.array([length, shucked_weight]).T
y = np.array([rings]).T
# OLS linear regression with length, shucked_weight
linreg_fit = LinearRegression().fit(X, y)
linreg_coef = linreg_fit.coef_
linreg_intercept = linreg_fit.intercept_
# Serialization of the fitted model
serialized_linreg_model = pickle.dumps(linreg_fit, protocol=3)
return LinregType(
col_nm=["length", "shucked_weight"],
coef=linreg_coef[0],
intercept=linreg_intercept[0],
serialized_linreg_model=serialized_linreg_model,
created_dt=str(datetime.datetime.now()),
)
Apply linreg_fitted
function to our train set:
[12]:
# DROP TABLE IF EXISTS plc_linreg_fitted;
# CREATE TABLE plc_linreg_fitted AS (
# SELECT
# a.sex
# , (plc_linreg_func(
# a.length_agg
# , a.shucked_weight_agg
# , a.rings_agg)
# ).*
# FROM (
# SELECT
# sex
# , ARRAY_AGG(length) AS length_agg
# , ARRAY_AGG(shucked_weight) AS shucked_weight_agg
# , ARRAY_AGG(rings) AS rings_agg
# FROM abalone_split
# WHERE split = 1
# GROUP BY sex
# ) a
# ) DISTRIBUTED BY (sex);
linreg_fitted = abalone_train.group_by("sex").apply(
lambda t: linreg_func(t["length"], t["shucked_weight"], t["rings"]), expand=True
)
Take a look at models built:
[13]:
linreg_fitted[["sex", "col_nm", "coef", "intercept", "created_dt"]]
[13]:
sex | col_nm | coef | intercept | created_dt |
---|---|---|---|---|
F | ['length', 'shucked_weight'] | [25.9425725980866, -8.40468544064155] | -0.145930582701281 | 2023-02-23 20:58:21.485528 |
I | ['length', 'shucked_weight'] | [15.4355302117113, 0.370827174250677] | 1.23206474957206 | 2023-02-23 20:58:21.485653 |
M | ['length', 'shucked_weight'] | [22.6450273736554, -6.02938247876165] | 0.569975195099637 | 2023-02-23 20:58:21.489115 |
Summary of Parameters
Using ARRAY_APPEND
:
[14]:
# SELECT
# sex,
# UNNEST(ARRAY_APPEND(col_nm, 'intercept')) AS col_nm2,
# UNNEST(ARRAY_APPEND(coef, intercept)) AS coef2
# from linreg_fitted;
unnest = gp.function("unnest")
array_append = gp.function("array_append")
linreg_fitted.assign(
col_nm2=lambda t: unnest(array_append(t["col_nm"], "intercept")),
coef2=lambda t: unnest(array_append(t["coef"], t["intercept"])),
)[["sex", "col_nm2", "coef2"]]
[14]:
sex | col_nm2 | coef2 |
---|---|---|
F | length | 25.9425725980866 |
F | shucked_weight | -8.40468544064155 |
F | intercept | -0.145930582701292 |
I | length | 15.4355302117113 |
I | shucked_weight | 0.370827174250674 |
I | intercept | 1.23206474957206 |
M | length | 22.6450273736554 |
M | shucked_weight | -6.02938247876165 |
M | intercept | 0.569975195099643 |
Prediction
Creation of prediction function
[15]:
@gp.create_function
def linreg_pred_func(serialized_model: bytes, length: float, shucked_weight: float) -> float:
# Deserialize the serialized model
model = pickle.loads(serialized_model)
features = [length, shucked_weight]
# Predict the target variable
y_pred = model.predict([features])
return y_pred[0][0]
Join model dataframe and test set dataframe
[16]:
linreg_test_fit = linreg_fitted.inner_join(
abalone_test,
cond=lambda t1, t2: t1["sex"] == t2["sex"],
self_columns=["col_nm", "coef", "intercept", "serialized_linreg_model", "created_dt"],
)
Predict test set
[17]:
# CREATE TABLE plc_linreg_pred_dot AS (
# SELECT
# test.id
# , test.sex
# , test.rings
# , plc_linreg_pred_dot_func(
# model.coef
# , model.intercept
# , ARRAY[length, shucked_weight]
# ) AS y_pred
# FROM
# (SELECT * FROM abalone_split WHERE split=0) AS test
# , plc_linreg_fitted AS model
# WHERE test.sex = model.sex
# ) DISTRIBUTED BY (sex);
linreg_pred = linreg_test_fit.assign(
rings_pred=lambda t: linreg_pred_func(
t["serialized_linreg_model"],
t["length"],
t["shucked_weight"],
),
)[["id", "sex", "rings", "rings_pred"]]
[18]:
# SELECT * FROM plc_linreg_pred WHERE sex='I' ORDER BY id LIMIT 5;
linreg_pred[lambda t: t["sex"] == "I"].order_by("id")[:5]
[18]:
id | sex | rings | rings_pred |
---|---|---|---|
22 | I | 10 | 7.1272324039624 |
45 | I | 4 | 4.48001556958082 |
106 | I | 7 | 6.35897875153222 |
107 | I | 8 | 7.8444517211187 |
150 | I | 6 | 6.27660952003415 |
Evaluation of model
Creation of evaluation return type
[19]:
# CREATE TYPE plc_linreg_eval_type AS (
# mae float8
# , mape float8
# , mse float8
# , r2_score float8
# );
@dataclasses.dataclass
class linreg_eval_type:
mae: float
mape: float
mse: float
r2_score: float
Creation of evaluation function
[20]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
@gp.create_column_function
def linreg_eval(y_actual: List[float], y_pred: List[float]) -> linreg_eval_type:
mae = mean_absolute_error(y_actual, y_pred)
mse = mean_squared_error(y_actual, y_pred)
r2_score_ = r2_score(y_actual, y_pred)
y_pred_f = np.array(y_pred, dtype=float)
mape = 100 * sum(abs(y_actual - y_pred_f) / y_actual) / len(y_actual)
return linreg_eval_type(mae, mape, mse, r2_score_)
Evaluate our model
[21]:
# SELECT
# sex
# , (linreg_eval(rings_agg, y_pred_agg)).*
# FROM (
# SELECT
# sex
# , ARRAY_AGG(rings) AS rings_agg
# , ARRAY_AGG(y_pred) AS y_pred_agg
# FROM plc_linreg_pred
# GROUP BY sex
# ) a
linreg_pred.group_by("sex").apply(lambda t: linreg_eval(t["rings"], t["rings_pred"]), expand=True)
[21]:
sex | mae | mape | mse | r2_score |
---|---|---|---|---|
F | 2.14618814657571 | 18.7708049967624 | 8.68959806015986 | 0.127677315539933 |
I | 1.21058504580913 | 14.6104201739291 | 3.30775897060488 | 0.480766554461037 |
M | 2.03231596723483 | 19.0254559285428 | 7.32630298752111 | 0.199558502231448 |