从Pyspark UDF调用另一个自定义Python函数的方法步骤

 更新时间:2023年11月02日 11:11:39   作者:鲸落_  
PySpark,通常称为Apache Spark的Python API,是为分布式数据处理而创建的,使用UDF,可以扩展和定制 PySpark 的功能以满足某些需求,在本文中,我们将学习如何从Pyspark UDF调用另一个自定义Python函数,需要的朋友可以参考下

PySpark,通常称为 Apache Spark 的 Python API,是为分布式数据处理而创建的。它使用户能够高效且可扩展地对大型数据集进行复杂的计算和转换。用户定义函数 (UDF),允许用户创建自己独特的函数并将其应用到 Spark DataFrame 或 RDD,这是 PySpark 的主要功能之一 。使用 UDF,可以扩展和定制 PySpark 的功能以满足某些需求。在本文中,我们将学习如何从 Pyspark UDF 调用另一个自定义 Python 函数。

从 Pyspark UDF 调用另一个自定义 Python函数

Python 编码的 PySpark UDF 提供了调用其他Python 函数的能力,无论它们是内置函数还是来自外部库的用户定义函数。通过使用户能够利用现有的 Python 代码,此功能提高了 UDF 的模块化和可重用性。在分布式 PySpark 环境中,用户可以轻松实现特定领域的逻辑、执行具有挑战性的计算或使用尖端算法。用户可以通过从 PySpark UDF 调用 Python 函数来充分利用 Python 庞大的库和功能生态系统的全部潜力。

从 PySpark UDF 调用另一个自定义 Python 函数的步骤

让我们看看从 Pyspark UDF 调用另一个自定义 Python 函数的分步过程。

第1步:导入必要的模块

首先,从“pyspark.sql.functions” 模块导入“udf” ,该模块提供了处理 Spark DataFrame 的工具。

from pyspark.sql.functions import udf

第 2 步:启动 Spark 会话

接下来,通过导入必要的 Spark 模块来创建 Spark 会话。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

第 3 步:创建数据框

下一步是创建一个数据帧,用于在 Spark 中执行操作。

data = [("Marry", 25), ("Sunny", 30), ("Ram", 35)]
df = spark.createDataFrame(data, ["name", "age"])

第 4 步:定义自定义 Python 函数

然后定义我们希望从 PySpark UDF 调用的自定义 Python 函数。我们可以在此函数中使用我们需要的任何逻辑或计算。例如,将字符串转换为大写字符串的函数。

def to_uppercase(string):
    return string.upper()

第 5 步: 创建 PySpark UDF

创建自定义 Python 函数后,使用 “pyspark.sql.functions” 模块中的 UDF 函数构造 PySpark UDF。 “udf()” 函数应接收自定义 Python 函数作为参数。自定义函数注册为 UDF,以便它可以应用于 DataFrame 列。

to_uppercase_udf = udf(to_uppercase)

步骤 6:将 UDF 应用到 DataFrame

创建 PySpark UDF 后,使用 “withColumn()” 函数将其应用到 DataFrame 列。在 DataFrame 中,此方法添加新列或删除现有列。DataFrame 的每一行都会调用 UDF 一次,将自定义 Python 函数应用于指定列并生成所需的结果。

df = df.withColumn("name_uppercase", to_uppercase_udf(df["name"]))

第7步: 显示数据框

最后,我们将使用 “show()” 函数显示数据框以查看对其所做的更改。

df.show()

按照这些说明,我们可以通过从 PySpark UDF 调用另一个自定义 Python 函数来在 PySpark DataFrame 上执行自定义计算和转换。

从 PySpark UDF 调用另一个自定义 Python 函数的示例

现在,让我们看看从 Pyspark UDF 调用 Python 自定义函数的几个不同示例。

示例 1:将 DataFrame 列转换为大写

在此示例中,我们将使用 Pyspark 创建一个包含人员姓名和年龄的Spark 数据框 “df” 。然后我们将定义一个自定义 Python 函数“ to_uppercase()”,它将Python 字符串作为参数并将其转换为大写并将结果存储在该数据帧的新列中。然后我们使用 Pyspark 的“ udf() ”函数创建 Pyspark UDF。

Python3

# 导入模块
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# 定义自定义Python函数
def to_uppercase(string):
	return string.upper()

# 创建一个SparkSession
spark = SparkSession.builder.getOrCreate()

# 创建一个DataFrame
data = [("Marry", 25), ("Sunny", 30), ("Ram", 35)]
df = spark.createDataFrame(data, ["name", "age"])

# 创建 PySpark UDF
to_uppercase_udf = udf(to_uppercase)

# 将UDF应用于“name”列
df = df.withColumn("name_uppercase", to_uppercase_udf(df["name"]))

# 用于显示DataFrame的函数
df.show()

输出:

示例 2: 调用组合多个 DataFrame 列的自定义 Python 函数

在此示例中,我们将创建一个包含 2 列的数据框 - ' first_name ' 和 ' last_name '。然后创建一个 Python 自定义函数“ combine_columns ”,它将“first_name”和“last_name”作为参数,并返回一个列,将它们组合在一起以创建“ full_name”

Python3

# 导入模块
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# 自定义Python函数
def combine_columns(col1, col2):
	return col1 + " " + col2

# 创建SparkSession
spark = SparkSession.builder.getOrCreate()

# 创建一个DataFrame
data = [("John", "Doe"), ("Ram", "Kumar"), ("Smith", "Jones")]
df = spark.createDataFrame(data, ["first_name", "last_name"])

# 制作PySpark UDF
combine_columns_udf = udf(combine_columns)

# 将 UDF应用于“first_name”和“last_name”列
df = df.withColumn("full_name", combine_columns_udf(df["first_name"], df["last_name"]))

# 用于显示DataFrame的函数
df.show()

输出:

示例 3:使用外部库从 PySpark UDF 调用自定义 Python 函数

对于更复杂的计算,PySpark 使我们能够在定制函数中使用外部 Python 库。假设我们希望使用模糊匹配库 “fuzzywuzzy” 和名为 “calculate_similarity” 的自定义 Python 方法来比较两个文本之间的相似度。

在此示例中,我们从Python 中的 fuzzywuzzy 库导入“fuzz”模块,并使用“ fuzz.ratio() ”函数来确定两个文本之间的相似程度。我们创建了独特的 Python 方法“ calculate_similarity() ”来使用输入字符串调用 “fuzz.ratio()” 算法。使用 “udf()” 函数,我们构建一个名为 “similarity_udf” 的 UDF 并定义输入和输出类型。最后,我们使用 “withColumn()” 方法将 UDF 应用于“string1”和“string2”列,并显示具有相似率的结果 DataFrame。

Python3

# 导入模块
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from fuzzywuzzy import fuzz

# 创建SparkSession
spark = SparkSession.builder.getOrCreate()

# 使用列“string1”和“string2”获取的示例DataFrame
data = [("apple", "apples"), ("banana", "bananas"), ("cat", "dog")]
df = spark.createDataFrame(data, ["string1", "string2"])

# 创建自定义Python函数
def calculate_similarity(str1, str2):
	return fuzz.ratio(str1, str2)

# 从自定义函数创建自定义项
similarity_udf = udf(calculate_similarity, IntegerType())

# 应用UDF计算相似性
df.withColumn("similarity", similarity_udf(df["string1"], df["string2"])).show()

输出:

示例 4:应用具有复杂逻辑的自定义 Python 函数

让我们看一个示例,其中有一个 DataFrame,其中有一列表示句子的字符串,并且我们希望使用名为“ count_words” 的自定义 Python 函数来确定每个短语中存在多少个单词。

在此图中,自定义 Python 函数 “count_words” 使用 “split()” 方法将输入文本分解为单词,并使用 “len()” 函数获取单词计数。使用 “udf()” 函数,我们构建一个名为 “count_udf” 的 UDF并定义输入和输出类型。最后,我们使用 “withColumn()” 方法将 UDF 应用到 “sentence” 列,并显示带有字数统计的结果 DataFrame。

Python3

# 导入模块
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()

# 具有列“sentence”的示例DataFrame
data = [("Hello, PySpark!",), ("PySpark is great in today's world",),
		("Spark DataFrames are powerful in python to work on",)]
df = spark.createDataFrame(data, ["sentence"])

# 创建自定义Python函数
def count_words(sentence):
	return len(sentence.split())


# 从自定义函数创建自定义项
count_udf = udf(count_words, IntegerType())

# 应用UDF计算每句话中的单词
df.withColumn("word_count", count_udf(df["sentence"])).show()

输出:

以上就是从Pyspark UDF调用另一个自定义Python函数的方法步骤的详细内容,更多关于Pyspark UDF调用Python函数的资料请关注脚本之家其它相关文章!

相关文章

  • python深度学习tensorflow实例数据下载与读取

    python深度学习tensorflow实例数据下载与读取

    这篇文章主要为大家介绍了python深度学习tensorflow实例数据下载与读取示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-06-06
  • Python学习之路之pycharm的第一个项目搭建过程

    Python学习之路之pycharm的第一个项目搭建过程

    这篇文章主要介绍了Python学习之路之pycharm的第一个项目搭建过程,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-06-06
  • python破解zip加密文件的方法

    python破解zip加密文件的方法

    这篇文章主要介绍了python破解zip加密文件的方法,本文图文并茂给大家介绍的非常详细,需要的朋友可以参考下
    2018-05-05
  • python3使用matplotlib绘制散点图

    python3使用matplotlib绘制散点图

    这篇文章主要为大家详细介绍了python3使用matplotlib绘制散点图,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-03-03
  • 如何用python处理excel表格

    如何用python处理excel表格

    在本篇文章里小编给大家整理了关于python处理excel表格的详细步骤内容,需要的朋友们可以参考下。
    2020-06-06
  • Python关于__name__属性的含义和作用详解

    Python关于__name__属性的含义和作用详解

    在本篇文章里小编给大家分享的是关于Python关于__name__属性的含义和作用知识点,需要的朋友们可以参考下。
    2020-02-02
  • python读取图片任意范围区域

    python读取图片任意范围区域

    这篇文章主要为大家详细介绍了python读取图片任意范围区域,以一维数组形式返回,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-01-01
  • Python暴力破解Mysql数据的示例

    Python暴力破解Mysql数据的示例

    这篇文章主要介绍了Python暴力破解Mysql数据的示例,帮助大家更好的理解和使用MySQL,感兴趣的朋友可以了解下
    2020-11-11
  • Python中按值来获取指定的键

    Python中按值来获取指定的键

    今天小编就为大家分享一篇关于Python中按值来获取指定的键,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-03-03
  • Python 多线程超详细到位总结

    Python 多线程超详细到位总结

    线程在程序中是独立的、并发的执行流。与分隔的进程相比,进程中线程之间的隔离程度要小,它们共享内存、文件句柄和其他进程应有的状态。线程的划分尺度小于进程,使多线程程序的并发性高。进程在执行过程中拥有独立内存单元,而多个线程共享内存,从而提升程序运行效率
    2021-11-11

最新评论