spark integration with udf seems to fail
hi there, please see below code snippets and error. any idea how to fix ?
Thanks
Pradeep
create sample data frame:
from pyspark.sql.types import StringType, DoubleType
x = '{ "name":"John", "age":30, "city":"Old York"}'
y = '{ "name":"John", "age":30, "city":"New York"}'
z = '{ "name":"John", "age":30, "city":" York"}'
df=spark.createDataFrame([x,y,z], StringType()).toDF("json_cols")
display(df)
create udf :
General Imports
import logging
import time
from datetime import date, timedelta
import os
os.environ["GIT_PYTHON_REFRESH"] = "quiet"
import numpy as np
import pandas as pd
import math
from typing import Iterator
Dimensionality
from pyspark.ml.feature import PCA
from pyspark.ml.functions import vector_to_array
Databricks and Spark Imports
import pyspark
import pyspark.sql.functions as F
import mlflow
import mlflow.pyfunc
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import *
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler, Normalizer, FeatureHasher, ElementwiseProduct
from dataclasses import asdict
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.stat import Summarizer
#imports for newton embeds
import datasets
import torch
from transformers import AutoModel, AutoTokenizer
from sentence_transformers import SentenceTransformer
from sentence_transformers.util import cos_sim
from transformers import LongformerTokenizer, LongformerModel
from transformers import AutoTokenizer, AutoModel
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.functions import col
model = SentenceTransformer("nomic-ai/nomic-embed-text-v1-ablated", trust_remote_code=True)
broadcast_model = sc.broadcast(model)
@F.pandas_udf(returnType=ArrayType(DoubleType()))
def nomicEmbed(c:pd.Series)->pd.Series:
#model = SentenceTransformer("nomic-ai/nomic-embed-text-v1-ablated", trust_remote_code=True)
embeddings = broadcast_model.value.encode(c,batch_size=8)
return pd.Series(embeddings.tolist())
Apply the UDF and select the embeddings as a new column in the dataframe
run it:
embed_df = df.limit(100).withColumn("embeddings", nomicEmbed(col("json_cols")))
Display the result
display(embed_df)
error:
if I instantiate the model inside the UDF , it seems to work , but thats not scalable, the above code seems to fail with error if I instantiate model out side the UDF
File "", line 19, in nomicEmbed
File "/databricks/spark/python/pyspark/broadcast.py", line 288, in value
self._value = self.load_from_path(self._path)
File "/databricks/spark/python/pyspark/broadcast.py", line 235, in load_from_path
return self.load(f)
File "/databricks/spark/python/pyspark/broadcast.py", line 274, in load
gc.enable()
ModuleNotFoundError: No module named 'transformers_modules'
I haven't used spark in a while, but this seems like an issue with downloading the right packages. Are you sure transformers
is installed?
hi Zach, I think I have the right packages because it works fine if I instantiate the model like this inside the UDF, but I can not do this for multiple records in the data frame, it runs out of memory real quick as it instantiates the model for each record
@F.pandas_udf(returnType=ArrayType(DoubleType()))
def nomicEmbed(c:pd.Series)->pd.Series:
model = SentenceTransformer("nomic-ai/nomic-embed-text-v1-ablated", trust_remote_code=True)
embeddings = broadcast_model.value.encode(c,batch_size=8)
return pd.Series(embeddings.tolist())
if it helps, if I choose this model , it works fine
model = SentenceTransformer('thenlper/gte-large')
Can you post the full error log from above?
Hey - curious if you resolved this?