Predicting the age of abalone with Linear Regression

Background

Predicting the age of abalone from physical measurements. The age of abalone is determined by cutting the shell through the cone, staining it, and counting the number of rings through a microscope – a boring and time-consuming task. Other measurements, which are easier to obtain, are used to predict the age.

Problem

Build regression models by sex which can predict the number of rings.

Fetching data from the Internet

The training data is located on the website. Thanks for Greenplum’s external table, accessing CSV data from a HTTP server is easy.

First, connect to the database. You may want to change the connect string like postgresql://<user>@<host>:<port>/<db_name> to fit your needs.

[ ]:
%load_ext sql
%sql postgresql://localhost/gpadmin

Then, create the external table:

[2]:
%%sql
-- External Table
DROP EXTERNAL TABLE IF EXISTS abalone_external;
CREATE EXTERNAL WEB TABLE abalone_external(
    sex text
    , length float8
    , diameter float8
    , height float8
    , whole_weight float8
    , shucked_weight float8
    , viscera_weight float8
    , shell_weight float8
    , rings integer -- target variable to predict
) EXECUTE 'curl http://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.data'
format 'CSV'
(null as '?');
 * postgresql://localhost/gpadmin
Done.
Done.
[2]:
[]
[3]:
%%sql
-- Create abalone table from an external table
DROP TABLE IF EXISTS abalone;
CREATE TABLE abalone AS (
    SELECT ROW_NUMBER() OVER() AS id, *
    FROM abalone_external
) DISTRIBUTED BY (sex);
 * postgresql://localhost/gpadmin
Done.
12531 rows affected.
[3]:
[]

Train-Test Set Split

Before proceeding data exploration, let’s split our dataset to train and test set.

  • Firstly, we fetch a random value between 0 and 1 to each row;

  • Then we create a percentile table that stores percentile values for each sex;

  • Finally, we join those 2 tables to obtain our training or test tables.

But since Ordered-Set Aggregate Function is not yet supported by the current version, we will skip this step with GreenplumPython and implement it with SQL.

[4]:
%%sql
CREATE TEMP TABLE temp_abalone_label AS
    (SELECT *, random() AS __samp_out_label FROM abalone);

CREATE TEMP TABLE train_percentile_disc AS
    (SELECT sex, percentile_disc(0.8) within GROUP (ORDER BY __samp_out_label) AS __samp_out_label
    FROM temp_abalone_label GROUP BY sex);
CREATE TEMP TABLE test_percentile_disc AS
    (SELECT sex, percentile_disc(0.2) within GROUP (ORDER BY __samp_out_label) AS __samp_out_label
    FROM temp_abalone_label GROUP BY sex);

DROP TABLE IF EXISTS abalone_train;
CREATE TABLE abalone_train AS
    (SELECT temp_abalone_label.*
        FROM temp_abalone_label
        INNER JOIN train_percentile_disc
        ON temp_abalone_label.__samp_out_label <= train_percentile_disc.__samp_out_label
        AND temp_abalone_label.sex = train_percentile_disc.sex
    );
DROP TABLE IF EXISTS abalone_test;
CREATE TABLE abalone_test AS
    (SELECT temp_abalone_label.*
        FROM temp_abalone_label
        INNER JOIN test_percentile_disc
        ON temp_abalone_label.__samp_out_label <= test_percentile_disc.__samp_out_label
        AND temp_abalone_label.sex = test_percentile_disc.sex
    )
 * postgresql://localhost/gpadmin
12531 rows affected.
3 rows affected.
3 rows affected.
Done.
10026 rows affected.
Done.
2508 rows affected.
[4]:
[]

Note that these features could be supported by GreenplumPython in future release.

Import preparation

Connect to Greenplum database named gpadmin

[5]:
import greenplumpython as gp
[6]:
db = gp.database("postgresql://localhost/gpadmin")

Data Exploration

Get access to the existing table abalone:

[7]:
abalone = db.create_dataframe(table_name="abalone")

Inspect the table:

[8]:
# SELECT * FROM abalone ORDER BY id LIMIT 5;

abalone.order_by("id")[:5]
[8]:
id sex length diameter height whole_weight shucked_weight viscera_weight shell_weight rings
1 M 0.455 0.365 0.095 0.514 0.2245 0.101 0.15 15
2 M 0.35 0.265 0.09 0.2255 0.0995 0.0485 0.07 7
3 F 0.53 0.42 0.135 0.677 0.2565 0.1415 0.21 9
4 M 0.44 0.365 0.125 0.516 0.2155 0.114 0.155 10
5 I 0.33 0.255 0.08 0.205 0.0895 0.0395 0.055 7

Observe the distribution of data on different segments:

[9]:
# SELECT gp_execution_segment() AS gp_segment_id, COUNT(*)
# FROM abalone
# GROUP BY 1;

import greenplumpython.builtins.functions as F

abalone.assign(gp_segment_id=lambda _: gp.function("gp_execution_segment")()).group_by(
    "gp_segment_id"
).apply(lambda _: F.count())
[9]:
count gp_segment_id
3921 2
8610 1

Since we already have table abalone_train ad abalone_test in the database, we can get access to them.

[10]:
abalone_train = db.create_dataframe(table_name="abalone_train")
abalone_test = db.create_dataframe(table_name="abalone_test")

Learning to Make Predictions

Training Model with Linear Regression

Creation of training function


NOTE: Since the linreg_func will be executed on Greenplum Database, all the python dependencies (scikit-learn/numpy/etc.) needs to be discoverable by the plpython extension. Please check Greenplum PL/Python Language for more information.


[11]:
from typing import List

# CREATE TYPE linreg_type AS (
#    col_nm text[]
#    , coef float8[]
#    , intercept float8
#    , serialized_linreg_model bytea
#    , created_dt text
# );

import dataclasses
import datetime


@dataclasses.dataclass
class LinregType:
    col_nm: List[str]
    coef: List[float]
    intercept: float
    serialized_linreg_model: bytes
    created_dt: str


# -- Create function
# -- Need to specify the return type -> API will create the corresponding type in Greenplum to return a row
# -- Will add argument to change language extensions, currently plpython3u by default

from sklearn.linear_model import LinearRegression
import numpy as np
import pickle


@gp.create_column_function
def linreg_func(length: List[float], shucked_weight: List[float], rings: List[int]) -> LinregType:
    X = np.array([length, shucked_weight]).T
    y = np.array([rings]).T

    # OLS linear regression with length, shucked_weight
    linreg_fit = LinearRegression().fit(X, y)
    linreg_coef = linreg_fit.coef_
    linreg_intercept = linreg_fit.intercept_

    # Serialization of the fitted model
    serialized_linreg_model = pickle.dumps(linreg_fit, protocol=3)

    return LinregType(
        col_nm=["length", "shucked_weight"],
        coef=linreg_coef[0],
        intercept=linreg_intercept[0],
        serialized_linreg_model=serialized_linreg_model,
        created_dt=str(datetime.datetime.now()),
    )

Apply linreg_fitted function to our train set:

[12]:
# DROP TABLE IF EXISTS plc_linreg_fitted;
# CREATE TABLE plc_linreg_fitted AS (
#    SELECT
#        a.sex
#        , (plc_linreg_func(
#            a.length_agg
#            , a.shucked_weight_agg
#            , a.rings_agg)
#        ).*
#    FROM (
#        SELECT
#            sex
#            , ARRAY_AGG(length) AS length_agg
#            , ARRAY_AGG(shucked_weight) AS shucked_weight_agg
#            , ARRAY_AGG(rings) AS rings_agg
#        FROM abalone_split
#        WHERE split = 1
#        GROUP BY sex
#    ) a
# ) DISTRIBUTED BY (sex);

linreg_fitted = abalone_train.group_by("sex").apply(
    lambda t: linreg_func(t["length"], t["shucked_weight"], t["rings"]), expand=True
)

Take a look at models built:

[13]:
linreg_fitted[["sex", "col_nm", "coef", "intercept", "created_dt"]]
[13]:
sex col_nm coef intercept created_dt
F ['length', 'shucked_weight'] [25.9425725980866, -8.40468544064155] -0.145930582701281 2023-02-23 20:58:21.485528
I ['length', 'shucked_weight'] [15.4355302117113, 0.370827174250677] 1.23206474957206 2023-02-23 20:58:21.485653
M ['length', 'shucked_weight'] [22.6450273736554, -6.02938247876165] 0.569975195099637 2023-02-23 20:58:21.489115

Summary of Parameters

Using ARRAY_APPEND:

[14]:
# SELECT
#   sex,
#   UNNEST(ARRAY_APPEND(col_nm, 'intercept')) AS col_nm2,
#   UNNEST(ARRAY_APPEND(coef, intercept)) AS coef2
# from linreg_fitted;

unnest = gp.function("unnest")
array_append = gp.function("array_append")

linreg_fitted.assign(
    col_nm2=lambda t: unnest(array_append(t["col_nm"], "intercept")),
    coef2=lambda t: unnest(array_append(t["coef"], t["intercept"])),
)[["sex", "col_nm2", "coef2"]]
[14]:
sex col_nm2 coef2
F length 25.9425725980866
F shucked_weight -8.40468544064155
F intercept -0.145930582701292
I length 15.4355302117113
I shucked_weight 0.370827174250674
I intercept 1.23206474957206
M length 22.6450273736554
M shucked_weight -6.02938247876165
M intercept 0.569975195099643

Prediction

Creation of prediction function

[15]:
@gp.create_function
def linreg_pred_func(serialized_model: bytes, length: float, shucked_weight: float) -> float:
    # Deserialize the serialized model
    model = pickle.loads(serialized_model)
    features = [length, shucked_weight]
    # Predict the target variable
    y_pred = model.predict([features])
    return y_pred[0][0]

Join model dataframe and test set dataframe

[16]:
linreg_test_fit = linreg_fitted.inner_join(
    abalone_test,
    cond=lambda t1, t2: t1["sex"] == t2["sex"],
    self_columns=["col_nm", "coef", "intercept", "serialized_linreg_model", "created_dt"],
)

Predict test set

[17]:
# CREATE TABLE plc_linreg_pred_dot AS (
#    SELECT
#        test.id
#        , test.sex
#        , test.rings
#        , plc_linreg_pred_dot_func(
#            model.coef
#            , model.intercept
#            , ARRAY[length, shucked_weight]
#        ) AS y_pred
#    FROM
#        (SELECT * FROM abalone_split WHERE split=0) AS test
#        , plc_linreg_fitted AS model
#    WHERE test.sex = model.sex
# ) DISTRIBUTED BY (sex);


linreg_pred = linreg_test_fit.assign(
    rings_pred=lambda t: linreg_pred_func(
        t["serialized_linreg_model"],
        t["length"],
        t["shucked_weight"],
    ),
)[["id", "sex", "rings", "rings_pred"]]
[18]:
# SELECT * FROM plc_linreg_pred WHERE sex='I' ORDER BY id LIMIT 5;

linreg_pred[lambda t: t["sex"] == "I"].order_by("id")[:5]
[18]:
id sex rings rings_pred
22 I 10 7.1272324039624
45 I 4 4.48001556958082
106 I 7 6.35897875153222
107 I 8 7.8444517211187
150 I 6 6.27660952003415

Evaluation of model

Creation of evaluation return type

[19]:
# CREATE TYPE plc_linreg_eval_type AS (
#    mae float8
#    , mape float8
#    , mse float8
#    , r2_score float8
# );


@dataclasses.dataclass
class linreg_eval_type:
    mae: float
    mape: float
    mse: float
    r2_score: float

Creation of evaluation function

[20]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score


@gp.create_column_function
def linreg_eval(y_actual: List[float], y_pred: List[float]) -> linreg_eval_type:
    mae = mean_absolute_error(y_actual, y_pred)
    mse = mean_squared_error(y_actual, y_pred)
    r2_score_ = r2_score(y_actual, y_pred)

    y_pred_f = np.array(y_pred, dtype=float)
    mape = 100 * sum(abs(y_actual - y_pred_f) / y_actual) / len(y_actual)
    return linreg_eval_type(mae, mape, mse, r2_score_)

Evaluate our model

[21]:
# SELECT
#    sex
#    , (linreg_eval(rings_agg, y_pred_agg)).*
# FROM (
#    SELECT
#        sex
#        , ARRAY_AGG(rings) AS rings_agg
#        , ARRAY_AGG(y_pred) AS y_pred_agg
#    FROM plc_linreg_pred
#    GROUP BY sex
# ) a


linreg_pred.group_by("sex").apply(lambda t: linreg_eval(t["rings"], t["rings_pred"]), expand=True)
[21]:
sex mae mape mse r2_score
F 2.14618814657571 18.7708049967624 8.68959806015986 0.127677315539933
I 1.21058504580913 14.6104201739291 3.30775897060488 0.480766554461037
M 2.03231596723483 19.0254559285428 7.32630298752111 0.199558502231448