Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sparklyr split string (to string)

Trying to split a string in sparklyr then use it for joins/filtering

I tried the suggested approach of tokenizing the string then separating it to new columns. Here is a reproducible example (note that I have to translate my NA that turns into a string "NA" after copy_to to actual NA, is there a way not having to do that)

x <- data.frame(Id=c(1,2,3,4),A=c('A-B','A-C','A-D',NA))
df <- copy_to(sc,x,'df')

df %>%  mutate(A = ifelse(A=='NA',NA,A)) %>% ft_regex_tokenizer(input.col="A", output.col="B", pattern="-",to_lower_case=F) %>% 
    sdf_separate_column("B", into=c("C", "D")) %>% filter(C=='A') 

Problem is if I try to filter on the newly created columns (e.g. %>% filter(C=='A') or join on them I get an error, see below

Error : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 367.0 failed 4 times, most recent failure: Lost task 0.3 in stage 367.0 (TID 5062, 10.139.64.4, executor 0): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$2: (string) => array<string>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:622)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:51)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:148)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:147)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
    at java.util.regex.Matcher.reset(Matcher.java:309)
    at java.util.regex.Matcher.<init>(Matcher.java:229)
    at java.util.regex.Pattern.matcher(Pattern.java:1093)
    at java.util.regex.Pattern.split(Pattern.java:1206)
    at java.util.regex.Pattern.split(Pattern.java:1273)
    at scala.util.matching.Regex.split(Regex.scala:526)
    at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:144)
    at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:141)
    ... 15 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2100)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2088)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2087)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2087)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1076)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2319)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2267)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2255)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:873)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2252)
    at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:259)
    at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:269)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectResult(Dataset.scala:2827)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3439)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2794)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2794)
    at org.apache.spark.sql.Dataset$$anonfun$54.apply(Dataset.scala:3423)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3422)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2794)
    at sparklyr.Utils$.collect(utils.scala:200)
    at sparklyr.Utils.collect(utils.scala)
    at sun.reflect.GeneratedMethodAccessor577.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sparklyr.Invoke.invoke(invoke.scala:139)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66)
    at sparklyr.BackendHandler.channelRead0(handler.scala:51)
    at sparklyr.BackendHandler.channelRead0(handler.scala:4)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkExcepti
In addition: Warning messages:
1: The parameter `input.col` is deprecated and will be removed in a future release. Please use `input_col` instead. 
2: The parameter `output.col` is deprecated and will be removed in a future release. Please use `output_col` instead

Not sure why as the type of the columns created are "StringType" according to sdf_schema.

Is there a solution using sparklyr to actually separate to columns I can use later as Strings without having to write out the frame to file, or having to collect to the driver node?

like image 587
hjkop Avatar asked Mar 25 '26 16:03

hjkop


1 Answers

Using Spark ML transformers is not a good choice here. Instead you should split function:

df %>% 
  mutate(B = split(A, "-")) %>% 
  sdf_separate_column("B", into = c("C", "D")) %>%
  filter(C %IS NOT DISTINCT FROM% "A") 
# Source: spark<?> [?? x 5]
     Id A     B          C     D    
  <dbl> <chr> <list>     <chr> <chr>
1     1 A-B   <list [2]> A     B    
2     2 A-C   <list [2]> A     C    
3     3 A-D   <list [2]> A     D  

or regexp_extract

pattern <- "^(.*)-(.*)$"

df %>% 
   mutate(
     C = regexp_extract(A, pattern, 1),
     D = regexp_extract(A, pattern, 2)
   ) %>%
   filter(C %IS NOT DISTINCT FROM% "A") 
# Source: spark<?> [?? x 4]
     Id A     C     D    
  <dbl> <chr> <chr> <chr>
1     1 A-B   A     B    
2     2 A-C   A     C    
3     3 A-D   A     D    

Nonetheless if you want to make RegexpTokenzier work you have handle NULL's (NA in external R types) first. It can be done for example with coalesce

tokenizer <- ft_regex_tokenizer(
  sc, input_col = "A", output_col = "B",
  pattern = "-", to_lower_case = F
)

df %>%  
  mutate(A = coalesce(A, "")) %>% 
  ml_transform(tokenizer, .) %>%
  sdf_separate_column("B", into=c("C", "D")) %>%
  filter(C %IS NOT DISTINCT FROM% "A")
# Source: spark<?> [?? x 5]
     Id A     B          C     D    
  <dbl> <chr> <list>     <chr> <chr>
1     1 A-B   <list [2]> A     B    
2     2 A-C   <list [2]> A     C    
3     3 A-D   <list [2]> A     D    

or by removing missing data first:

df %>%  
  # or filter(!is.na(A))
  na.omit(columns=c("A")) %>%                      
  ml_transform(tokenizer, .) %>%
  sdf_separate_column("B", into=c("C", "D")) %>%
  filter(C %IS NOT DISTINCT FROM% "A")
* Dropped 1 rows with 'na.omit' (4 => 3)
# Source: spark<?> [?? x 5]
     Id A     B          C     D    
  <dbl> <chr> <list>     <chr> <chr>
1     1 A-B   <list [2]> A     B    
2     2 A-C   <list [2]> A     C    
3     3 A-D   <list [2]> A     D  
like image 135
zero323 Avatar answered Mar 27 '26 05:03

zero323