Pregunta Llamar a la función Java / Scala desde una tarea


Fondo

Mi pregunta original aquí fue Por qué usar DecisionTreeModel.predict función de mapa interior plantea una excepción? y está relacionado con ¿Cómo generar tuplas de (etiqueta original, etiqueta predicha) en Spark con MLlib?

Cuando usamos la API de Scala una forma recomendada de obtener predicciones para RDD[LabeledPoint] utilizando DecisionTreeModel es simplemente mapear RDD:

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

Lamentablemente, un enfoque similar en PySpark no funciona tan bien:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()

Excepción: parece que intenta hacer referencia a SparkContext desde una variable, acción o transforamción de difusión. SparkContext solo se puede usar en el controlador, no en el código que se ejecuta en los trabajadores. Para más información, ver SPARK-5063.

En lugar de eso documentación oficial recomienda algo como esto:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

Entonces, ¿qué está pasando aquí? No hay una variable de difusión aquí y API de Scala define predict como sigue:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing a single data point
 * @return Double prediction from the trained model
 */
def predict(features: Vector): Double = {
  topNode.predict(features)
}

/**
 * Predict values for the given data set using the model trained.
 *
 * @param features RDD representing data points to be predicted
 * @return RDD of predictions for each of the given data points
 */
def predict(features: RDD[Vector]): RDD[Double] = {
  features.map(x => predict(x))
}

por lo menos, a primera vista, llamar desde la acción o la transformación no es un problema, ya que la predicción parece ser una operación local.

Explicación

Después de excavar descubrí que la fuente del problema es una JavaModelWrapper.call método invocado desde DecisionTreeModel.predict. Eso acceso  SparkContext que se requiere para llamar a la función Java:

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

Pregunta

En caso de DecisionTreeModel.predict hay una solución alternativa recomendada y todo el código requerido ya es parte de la API de Scala, pero ¿hay alguna forma elegante de manejar un problema como este en general?

Solo las soluciones en las que puedo pensar en este momento son bastante pesadas:

  • presionando todo hacia abajo a JVM extendiendo las clases de Spark a través de Conversiones implícitas o agregando algún tipo de envoltorios
  • utilizando la puerta de enlace Py4j directamente

32
2017-07-28 18:54


origen


Respuestas:


La comunicación usando la puerta de enlace Py4J predeterminada simplemente no es posible. Para entender por qué tenemos que echar un vistazo al siguiente diagrama del documento PySpark Internals [1]:

enter image description here

Como la puerta de enlace Py4J se ejecuta en el controlador, no es accesible para los intérpretes de Python que se comunican con los trabajadores de JVM a través de sockets (consulte, por ejemplo, PythonRDD / rdd.py)

Teóricamente, podría ser posible crear una puerta de enlace Py4J por separado para cada trabajador, pero en la práctica es poco probable que sea útil. Ignorando problemas como la confiabilidad Py4J simplemente no está diseñado para realizar tareas intensivas de datos.

¿Hay alguna solución?

  1. Utilizando API de fuentes de datos Spark SQL para envolver el código JVM

    Pros: Alto nivel admitido, no requiere acceso a la API interna de PySpark

    Contras: Relativamente detallado y no muy bien documentado, limitado principalmente a los datos de entrada

  2. Operando en DataFrames usando las UDF de Scala.

    Pros: Fácil de implementar (ver Spark: ¿Cómo mapear Python con Scala o Java User Defined Functions?), no hay conversión de datos entre Python y Scala si los datos ya están almacenados en un DataFrame, acceso mínimo a Py4J

    Contras: Requiere acceso a la puerta de enlace Py4J y métodos internos, limitado a Spark SQL, difícil de depurar, no compatible

  3. Crear una interfaz Scala de alto nivel de forma similar a cómo se hace en MLlib.

    Pros: Flexible, capacidad de ejecutar código complejo arbitrario. Se puede donar directamente en RDD (ver por ejemplo Envoltorios modelo MLlib) o con DataFrames (ver Cómo usar una clase Scala dentro de Pyspark) La última solución parece ser mucho más amigable ya que todos los detalles de servicio ya están manejados por la API existente.

    Contras: Bajo nivel, conversión de datos requerida, lo mismo que UDFs requiere acceso a Py4J y API interna, no compatible

    Algunos ejemplos básicos se pueden encontrar en Transformando PySpark RDD con Scala

  4. Utilizar la herramienta de gestión de flujo de trabajo externa para alternar entre trabajos de Python y Scala / Java y pasar datos a un DFS.

    Pros: Fácil de implementar, cambios mínimos en el código mismo

    Contras: Costo de leer / escribir datos (Alluxio?)

  5. Usando compartido SQLContext (ver por ejemplo Apache Zeppelin o Livio) para pasar datos entre los idiomas de los invitados utilizando tablas temporales registradas.

    Pros: Muy adecuado para el análisis interactivo

    Contras: No tanto para trabajos por lotes (Zeppelin) o puede requerir orquestación adicional (Livy)


  1. Joshua Rosen. (2014, agosto 04) PySpark Internals. Obtenido de https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals

36
2017-12-22 09:14