{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Introducing ML package of PySpark" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Predict chances of infant survival with ML" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load the data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, we load the data." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "import pyspark.sql.types as typ\n", "\n", "spark = SparkSession.builder.master(\"local\").getOrCreate()\n", "\n", "labels = [\n", " ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),\n", " ('BIRTH_PLACE', typ.StringType()),\n", " ('MOTHER_AGE_YEARS', typ.IntegerType()),\n", " ('FATHER_COMBINED_AGE', typ.IntegerType()),\n", " ('CIG_BEFORE', typ.IntegerType()),\n", " ('CIG_1_TRI', typ.IntegerType()),\n", " ('CIG_2_TRI', typ.IntegerType()),\n", " ('CIG_3_TRI', typ.IntegerType()),\n", " ('MOTHER_HEIGHT_IN', typ.IntegerType()),\n", " ('MOTHER_PRE_WEIGHT', typ.IntegerType()),\n", " ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),\n", " ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),\n", " ('DIABETES_PRE', typ.IntegerType()),\n", " ('DIABETES_GEST', typ.IntegerType()),\n", " ('HYP_TENS_PRE', typ.IntegerType()),\n", " ('HYP_TENS_GEST', typ.IntegerType()),\n", " ('PREV_BIRTH_PRETERM', typ.IntegerType())\n", "]\n", "\n", "schema = typ.StructType([\n", " typ.StructField(e[0], e[1], False) for e in labels\n", "])\n", "\n", "births = spark.read.csv('births_transformed.csv.gz', \n", " header=True, \n", " schema=schema)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+\n", "|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINED_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|\n", "+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+\n", "| 0| 1| 29| 99| 0| 0| 0| 0| 99| 999| 999| 99| 0| 0| 0| 0| 0|\n", "| 0| 1| 22| 29| 0| 0| 0| 0| 65| 180| 198| 18| 0| 0| 0| 0| 0|\n", "| 0| 1| 38| 40| 0| 0| 0| 0| 63| 155| 167| 12| 0| 0| 0| 0| 0|\n", "| 0| 1| 39| 42| 0| 0| 0| 0| 60| 128| 152| 24| 0| 0| 0| 0| 1|\n", "| 0| 1| 18| 99| 6| 4| 2| 2| 61| 110| 130| 20| 0| 0| 0| 0| 0|\n", "+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+\n", "only showing top 5 rows\n", "\n", "root\n", " |-- INFANT_ALIVE_AT_REPORT: integer (nullable = true)\n", " |-- BIRTH_PLACE: string (nullable = true)\n", " |-- MOTHER_AGE_YEARS: integer (nullable = true)\n", " |-- FATHER_COMBINED_AGE: integer (nullable = true)\n", " |-- CIG_BEFORE: integer (nullable = true)\n", " |-- CIG_1_TRI: integer (nullable = true)\n", " |-- CIG_2_TRI: integer (nullable = true)\n", " |-- CIG_3_TRI: integer (nullable = true)\n", " |-- MOTHER_HEIGHT_IN: integer (nullable = true)\n", " |-- MOTHER_PRE_WEIGHT: integer (nullable = true)\n", " |-- MOTHER_DELIVERY_WEIGHT: integer (nullable = true)\n", " |-- MOTHER_WEIGHT_GAIN: integer (nullable = true)\n", " |-- DIABETES_PRE: integer (nullable = true)\n", " |-- DIABETES_GEST: integer (nullable = true)\n", " |-- HYP_TENS_PRE: integer (nullable = true)\n", " |-- HYP_TENS_GEST: integer (nullable = true)\n", " |-- PREV_BIRTH_PRETERM: integer (nullable = true)\n", "\n" ] } ], "source": [ "births.show(5)\n", "births.take(5)\n", "births.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create transformers" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "import pyspark.ml.feature as ft\n", "\n", "births = births \\\n", " .withColumn( 'BIRTH_PLACE_INT', \n", " births['BIRTH_PLACE'] \\\n", " .cast(typ.IntegerType()))" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- INFANT_ALIVE_AT_REPORT: integer (nullable = true)\n", " |-- BIRTH_PLACE: string (nullable = true)\n", " |-- MOTHER_AGE_YEARS: integer (nullable = true)\n", " |-- FATHER_COMBINED_AGE: integer (nullable = true)\n", " |-- CIG_BEFORE: integer (nullable = true)\n", " |-- CIG_1_TRI: integer (nullable = true)\n", " |-- CIG_2_TRI: integer (nullable = true)\n", " |-- CIG_3_TRI: integer (nullable = true)\n", " |-- MOTHER_HEIGHT_IN: integer (nullable = true)\n", " |-- MOTHER_PRE_WEIGHT: integer (nullable = true)\n", " |-- MOTHER_DELIVERY_WEIGHT: integer (nullable = true)\n", " |-- MOTHER_WEIGHT_GAIN: integer (nullable = true)\n", " |-- DIABETES_PRE: integer (nullable = true)\n", " |-- DIABETES_GEST: integer (nullable = true)\n", " |-- HYP_TENS_PRE: integer (nullable = true)\n", " |-- HYP_TENS_GEST: integer (nullable = true)\n", " |-- PREV_BIRTH_PRETERM: integer (nullable = true)\n", " |-- BIRTH_PLACE_INT: integer (nullable = true)\n", "\n" ] } ], "source": [ "births.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Having done this, we can now create our first `Transformer`." ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "encoder = ft.OneHotEncoder(\n", " inputCol='BIRTH_PLACE_INT', \n", " outputCol='BIRTH_PLACE_VEC')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's now create a single column with all the features collated together. " ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [], "source": [ "featuresCreator = ft.VectorAssembler(\n", " inputCols=[\n", " col[0] \n", " for col \n", " in labels[2:]] + \\\n", " [encoder.getOutputCol()], \n", " outputCol='features'\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create an estimator" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this example we will (once again) us the Logistic Regression model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyspark.ml.classification as cl" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once loaded, let's create the model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logistic = cl.LogisticRegression(\n", " maxIter=10, \n", " regParam=0.01, \n", " labelCol='INFANT_ALIVE_AT_REPORT')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "All that is left now is to creat a `Pipeline` and fit the model. First, let's load the `Pipeline` from the package." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.ml import Pipeline\n", "\n", "pipeline = Pipeline(stages=[\n", " encoder, \n", " featuresCreator, \n", " logistic\n", " ])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Fit the model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Conventiently, `DataFrame` API has the `.randomSplit(...)` method." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "births_train, births_test = births \\\n", " .randomSplit([0.7, 0.3], seed=666)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now run our `pipeline` and estimate our model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model = pipeline.fit(births_train)\n", "test_model = model.transform(births_test)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here's what the `test_model` looks like." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_model.take(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Model performance" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Obviously, we would like to now test how well our model did." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyspark.ml.evaluation as ev\n", "\n", "evaluator = ev.BinaryClassificationEvaluator(\n", " rawPredictionCol='probability', \n", " labelCol='INFANT_ALIVE_AT_REPORT')\n", "\n", "print(evaluator.evaluate(test_model, \n", " {evaluator.metricName: 'areaUnderROC'}))\n", "print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Saving the model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "PySpark allows you to save the `Pipeline` definition for later use." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'\n", "pipeline.write().overwrite().save(pipelinePath)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So, you can load it up later and use straight away to `.fit(...)` and predict." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "loadedPipeline = Pipeline.load(pipelinePath)\n", "loadedPipeline \\\n", " .fit(births_train)\\\n", " .transform(births_test)\\\n", " .take(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can also save the whole model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.ml import PipelineModel\n", "\n", "modelPath = './infant_oneHotEncoder_Logistic_PipelineModel'\n", "model.write().overwrite().save(modelPath)\n", "\n", "loadedPipelineModel = PipelineModel.load(modelPath)\n", "test_loadedModel = loadedPipelineModel.transform(births_test)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parameter hyper-tuning" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Grid search" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Load the `.tuning` part of the package." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyspark.ml.tuning as tune" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next let's specify our model and the list of parameters we want to loop through." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logistic = cl.LogisticRegression(\n", " labelCol='INFANT_ALIVE_AT_REPORT')\n", "\n", "grid = tune.ParamGridBuilder() \\\n", " .addGrid(logistic.maxIter, \n", " [2, 10, 50]) \\\n", " .addGrid(logistic.regParam, \n", " [0.01, 0.05, 0.3]) \\\n", " .build()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we need some way of comparing the models." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "evaluator = ev.BinaryClassificationEvaluator(\n", " rawPredictionCol='probability', \n", " labelCol='INFANT_ALIVE_AT_REPORT')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create the logic that will do the validation work for us." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cv = tune.CrossValidator(\n", " estimator=logistic, \n", " estimatorParamMaps=grid, \n", " evaluator=evaluator\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a purely transforming `Pipeline`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = Pipeline(stages=[encoder,featuresCreator])\n", "data_transformer = pipeline.fit(births_train)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Having done this, we are ready to find the optimal combination of parameters for our model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cvModel = cv.fit(data_transformer.transform(births_train))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `cvModel` will return the best model estimated. We can now use it to see if it performed better than our previous model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_train = data_transformer \\\n", " .transform(births_test)\n", "results = cvModel.transform(data_train)\n", "\n", "print(evaluator.evaluate(results, \n", " {evaluator.metricName: 'areaUnderROC'}))\n", "print(evaluator.evaluate(results, \n", " {evaluator.metricName: 'areaUnderPR'}))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What parameters has the best model? The answer is a little bit convoluted but here's how you can extract it." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results = [\n", " (\n", " [\n", " {key.name: paramValue} \n", " for key, paramValue \n", " in zip(\n", " params.keys(), \n", " params.values())\n", " ], metric\n", " ) \n", " for params, metric \n", " in zip(\n", " cvModel.getEstimatorParamMaps(), \n", " cvModel.avgMetrics\n", " )\n", "]\n", "\n", "sorted(results, \n", " key=lambda el: el[1], \n", " reverse=True)[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train-Validation splitting" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use the `ChiSqSelector` to select only top 5 features, thus limiting the complexity of our model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "selector = ft.ChiSqSelector(\n", " numTopFeatures=5, \n", " featuresCol=featuresCreator.getOutputCol(), \n", " outputCol='selectedFeatures',\n", " labelCol='INFANT_ALIVE_AT_REPORT'\n", ")\n", "\n", "logistic = cl.LogisticRegression(\n", " labelCol='INFANT_ALIVE_AT_REPORT',\n", " featuresCol='selectedFeatures'\n", ")\n", "\n", "pipeline = Pipeline(stages=[encoder,featuresCreator,selector])\n", "data_transformer = pipeline.fit(births_train)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `TrainValidationSplit` object gets created in the same fashion as the `CrossValidator` model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tvs = tune.TrainValidationSplit(\n", " estimator=logistic, \n", " estimatorParamMaps=grid, \n", " evaluator=evaluator\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As before, we fit our data to the model, and calculate the results." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tvsModel = tvs.fit(\n", " data_transformer \\\n", " .transform(births_train)\n", ")\n", "\n", "data_train = data_transformer \\\n", " .transform(births_test)\n", "results = tvsModel.transform(data_train)\n", "\n", "print(evaluator.evaluate(results, \n", " {evaluator.metricName: 'areaUnderROC'}))\n", "print(evaluator.evaluate(results, \n", " {evaluator.metricName: 'areaUnderPR'}))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Other features of PySpark ML in action" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Feature extraction" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### NLP related feature extractors" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Simple dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "text_data = spark.createDataFrame([\n", " ['''Machine learning can be applied to a wide variety \n", " of data types, such as vectors, text, images, and \n", " structured data. This API adopts the DataFrame from \n", " Spark SQL in order to support a variety of data types.'''],\n", " ['''DataFrame supports many basic and structured types; \n", " see the Spark SQL datatype reference for a list of \n", " supported types. In addition to the types listed in \n", " the Spark SQL guide, DataFrame can use ML Vector types.'''],\n", " ['''A DataFrame can be created either implicitly or \n", " explicitly from a regular RDD. See the code examples \n", " below and the Spark SQL programming guide for examples.'''],\n", " ['''Columns in a DataFrame are named. The code examples \n", " below use names such as \"text,\" \"features,\" and \"label.\"''']\n", "], ['input'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, we need to tokenize this text." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tokenizer = ft.RegexTokenizer(\n", " inputCol='input', \n", " outputCol='input_arr', \n", " pattern='\\s+|[,.\\\"]')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The output of the tokenizer looks similar to this." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tok = tokenizer \\\n", " .transform(text_data) \\\n", " .select('input_arr') \n", "\n", "tok.take(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use the `StopWordsRemover(...)`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stopwords = ft.StopWordsRemover(\n", " inputCol=tokenizer.getOutputCol(), \n", " outputCol='input_stop')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The output of the method looks as follows" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stopwords.transform(tok).select('input_stop').take(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Build `NGram` model and the `Pipeline`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ngram = ft.NGram(n=2, \n", " inputCol=stopwords.getOutputCol(), \n", " outputCol=\"nGrams\")\n", "\n", "pipeline = Pipeline(stages=[tokenizer, stopwords, ngram])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have the `pipeline` we follow in the very similar fashion as before." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_ngram = pipeline \\\n", " .fit(text_data) \\\n", " .transform(text_data)\n", " \n", "data_ngram.select('nGrams').take(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "That's it. We got our n-grams and we can then use them in further NLP processing." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Discretize continuous variables" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is sometimes useful to *band* the values into discrete buckets." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "\n", "x = np.arange(0, 100)\n", "x = x / 100.0 * np.pi * 4\n", "y = x * np.sin(x / 1.764) + 20.1234\n", "\n", "schema = typ.StructType([\n", " typ.StructField('continuous_var', \n", " typ.DoubleType(), \n", " False\n", " )\n", "])\n", "\n", "data = spark.createDataFrame([[float(e), ] for e in y], schema=schema)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use the `QuantileDiscretizer` model to split our continuous variable into 5 buckets (see the `numBuckets` parameter)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "discretizer = ft.QuantileDiscretizer(\n", " numBuckets=5, \n", " inputCol='continuous_var', \n", " outputCol='discretized')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's see what we got." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_discretized = discretizer.fit(data).transform(data)\n", "\n", "data_discretized \\\n", " .groupby('discretized')\\\n", " .mean('continuous_var')\\\n", " .sort('discretized')\\\n", " .collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Standardizing continuous variables\n", "\n", "Create a vector representation of our continuous variable (as it is only a single float)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "vectorizer = ft.VectorAssembler(\n", " inputCols=['continuous_var'], \n", " outputCol= 'continuous_vec')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Build a `normalizer` and a `pipeline`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "normalizer = ft.StandardScaler(\n", " inputCol=vectorizer.getOutputCol(), \n", " outputCol='normalized', \n", " withMean=True,\n", " withStd=True\n", ")\n", "\n", "pipeline = Pipeline(stages=[vectorizer, normalizer])\n", "data_standardized = pipeline.fit(data).transform(data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Classification" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will now use the `RandomForestClassfier` to model the chances of survival for an infant.\n", "\n", "First, we need to cast the label feature to `DoubleType`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyspark.sql.functions as func\n", "\n", "births = births.withColumn(\n", " 'INFANT_ALIVE_AT_REPORT', \n", " func.col('INFANT_ALIVE_AT_REPORT').cast(typ.DoubleType())\n", ")\n", "\n", "births_train, births_test = births \\\n", " .randomSplit([0.7, 0.3], seed=666)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We are ready to build our model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "classifier = cl.RandomForestClassifier(\n", " numTrees=5, \n", " maxDepth=5, \n", " labelCol='INFANT_ALIVE_AT_REPORT')\n", "\n", "pipeline = Pipeline(\n", " stages=[\n", " encoder,\n", " featuresCreator, \n", " classifier])\n", "\n", "model = pipeline.fit(births_train)\n", "test = model.transform(births_test)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's now see how the `RandomForestClassifier` model performs compared to the `LogisticRegression`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "evaluator = ev.BinaryClassificationEvaluator(\n", " labelCol='INFANT_ALIVE_AT_REPORT')\n", "print(evaluator.evaluate(test, \n", " {evaluator.metricName: \"areaUnderROC\"}))\n", "print(evaluator.evaluate(test, \n", " {evaluator.metricName: \"areaUnderPR\"}))" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "Let's test how well would one tree do, then." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "classifier = cl.DecisionTreeClassifier(\n", " maxDepth=5, \n", " labelCol='INFANT_ALIVE_AT_REPORT')\n", "pipeline = Pipeline(stages=[\n", " encoder,\n", " featuresCreator, \n", " classifier]\n", ")\n", "\n", "model = pipeline.fit(births_train)\n", "test = model.transform(births_test)\n", "\n", "evaluator = ev.BinaryClassificationEvaluator(\n", " labelCol='INFANT_ALIVE_AT_REPORT')\n", "print(evaluator.evaluate(test, \n", " {evaluator.metricName: \"areaUnderROC\"}))\n", "print(evaluator.evaluate(test, \n", " {evaluator.metricName: \"areaUnderPR\"}))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clustering" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this example we will use k-means model to find similarities in the births data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyspark.ml.clustering as clus\n", "\n", "kmeans = clus.KMeans(k = 5, \n", " featuresCol='features')\n", "\n", "pipeline = Pipeline(stages=[\n", " encoder,\n", " featuresCreator, \n", " kmeans]\n", ")\n", "\n", "model = pipeline.fit(births_train)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Having estimated the model, let's see if we can find some differences between clusters." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test = model.transform(births_test)\n", "\n", "test \\\n", " .groupBy('prediction') \\\n", " .agg({\n", " '*': 'count', \n", " 'MOTHER_HEIGHT_IN': 'avg'\n", " }).collect()" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "In the field of NLP, problems such as topic extract rely on clustering to detect documents with similar topics. First, let's create our dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "text_data = spark.createDataFrame([\n", " ['''To make a computer do anything, you have to write a \n", " computer program. To write a computer program, you have \n", " to tell the computer, step by step, exactly what you want \n", " it to do. The computer then \"executes\" the program, \n", " following each step mechanically, to accomplish the end \n", " goal. When you are telling the computer what to do, you \n", " also get to choose how it's going to do it. That's where \n", " computer algorithms come in. The algorithm is the basic \n", " technique used to get the job done. Let's follow an \n", " example to help get an understanding of the algorithm \n", " concept.'''],\n", " ['''Laptop computers use batteries to run while not \n", " connected to mains. When we overcharge or overheat \n", " lithium ion batteries, the materials inside start to \n", " break down and produce bubbles of oxygen, carbon dioxide, \n", " and other gases. Pressure builds up, and the hot battery \n", " swells from a rectangle into a pillow shape. Sometimes \n", " the phone involved will operate afterwards. Other times \n", " it will die. And occasionally—kapow! To see what's \n", " happening inside the battery when it swells, the CLS team \n", " used an x-ray technology called computed tomography.'''],\n", " ['''This technology describes a technique where touch \n", " sensors can be placed around any side of a device \n", " allowing for new input sources. The patent also notes \n", " that physical buttons (such as the volume controls) could \n", " be replaced by these embedded touch sensors. In essence \n", " Apple could drop the current buttons and move towards \n", " touch-enabled areas on the device for the existing UI. It \n", " could also open up areas for new UI paradigms, such as \n", " using the back of the smartphone for quick scrolling or \n", " page turning.'''],\n", " ['''The National Park Service is a proud protector of \n", " America’s lands. Preserving our land not only safeguards \n", " the natural environment, but it also protects the \n", " stories, cultures, and histories of our ancestors. As we \n", " face the increasingly dire consequences of climate \n", " change, it is imperative that we continue to expand \n", " America’s protected lands under the oversight of the \n", " National Park Service. Doing so combats climate change \n", " and allows all American’s to visit, explore, and learn \n", " from these treasured places for generations to come. It \n", " is critical that President Obama acts swiftly to preserve \n", " land that is at risk of external threats before the end \n", " of his term as it has become blatantly clear that the \n", " next administration will not hold the same value for our \n", " environment over the next four years.'''],\n", " ['''The National Park Foundation, the official charitable \n", " partner of the National Park Service, enriches America’s \n", " national parks and programs through the support of \n", " private citizens, park lovers, stewards of nature, \n", " history enthusiasts, and wilderness adventurers. \n", " Chartered by Congress in 1967, the Foundation grew out of \n", " a legacy of park protection that began over a century \n", " ago, when ordinary citizens took action to establish and \n", " protect our national parks. Today, the National Park \n", " Foundation carries on the tradition of early park \n", " advocates, big thinkers, doers and dreamers—from John \n", " Muir and Ansel Adams to President Theodore Roosevelt.'''],\n", " ['''Australia has over 500 national parks. Over 28 \n", " million hectares of land is designated as national \n", " parkland, accounting for almost four per cent of \n", " Australia's land areas. In addition, a further six per \n", " cent of Australia is protected and includes state \n", " forests, nature parks and conservation reserves.National \n", " parks are usually large areas of land that are protected \n", " because they have unspoilt landscapes and a diverse \n", " number of native plants and animals. This means that \n", " commercial activities such as farming are prohibited and \n", " human activity is strictly monitored.''']\n", "], ['documents'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, we will once again use the `RegexTokenizer` and the `StopWordsRemover` models." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tokenizer = ft.RegexTokenizer(\n", " inputCol='documents', \n", " outputCol='input_arr', \n", " pattern='\\s+|[,.\\\"]')\n", "\n", "stopwords = ft.StopWordsRemover(\n", " inputCol=tokenizer.getOutputCol(), \n", " outputCol='input_stop')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next in our pipeline is the `CountVectorizer`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stringIndexer = ft.CountVectorizer(\n", " inputCol=stopwords.getOutputCol(), \n", " outputCol=\"input_indexed\")\n", "\n", "tokenized = stopwords \\\n", " .transform(\n", " tokenizer\\\n", " .transform(text_data)\n", " )\n", " \n", "stringIndexer \\\n", " .fit(tokenized)\\\n", " .transform(tokenized)\\\n", " .select('input_indexed')\\\n", " .take(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will use the `LDA` model - the Latent Dirichlet Allocation model - to extract the topics." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "clustering = clus.LDA(k=2, optimizer='online', featuresCol=stringIndexer.getOutputCol())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Put these puzzles together." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = Pipeline(stages=[\n", " tokenizer, \n", " stopwords,\n", " stringIndexer, \n", " clustering]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's see if we have properly uncovered the topics." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "topics = pipeline \\\n", " .fit(text_data) \\\n", " .transform(text_data)\n", "\n", "topics.select('topicDistribution').collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Regression" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this section we will try to predict the `MOTHER_WEIGHT_GAIN`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "features = ['MOTHER_AGE_YEARS','MOTHER_HEIGHT_IN',\n", " 'MOTHER_PRE_WEIGHT','DIABETES_PRE',\n", " 'DIABETES_GEST','HYP_TENS_PRE', \n", " 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM',\n", " 'CIG_BEFORE','CIG_1_TRI', 'CIG_2_TRI', \n", " 'CIG_3_TRI'\n", " ]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, we will collate all the features together and use the `ChiSqSelector` to select only the top 6 most important features." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "featuresCreator = ft.VectorAssembler(\n", " inputCols=[col for col in features[1:]], \n", " outputCol='features'\n", ")\n", "\n", "selector = ft.ChiSqSelector(\n", " numTopFeatures=6, \n", " outputCol=\"selectedFeatures\", \n", " labelCol='MOTHER_WEIGHT_GAIN'\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In order to predict the weight gain we will use the gradient boosted trees regressor." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyspark.ml.regression as reg\n", "\n", "regressor = reg.GBTRegressor(\n", " maxIter=15, \n", " maxDepth=3,\n", " labelCol='MOTHER_WEIGHT_GAIN')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, again, we put it all together into a `Pipeline`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = Pipeline(stages=[\n", " featuresCreator, \n", " selector,\n", " regressor])\n", "\n", "weightGain = pipeline.fit(births_train)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Having created the `weightGain` model, let's see if it performs well on our testing data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "evaluator = ev.RegressionEvaluator(\n", " predictionCol=\"prediction\", \n", " labelCol='MOTHER_WEIGHT_GAIN')\n", "\n", "print(evaluator.evaluate(\n", " weightGain.transform(births_test), \n", " {evaluator.metricName: 'r2'}))" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 1 }