Вопрос:
Я новичок в scala/spark. Я работаю над приложением scala/java на искры, пытаясь прочитать некоторые данные из таблицы улей, а затем суммировать все значения столбцов для каждой строки. например, рассмотрим следующие DF:
+———+-+-+-+-+-+-+ | address|a|b|c|d|e|f| +———+-+-+-+-+-+-+ |Newyork |1|0|1|0|1|1| | LA |0|1|1|1|0|1| |Chicago |1|1|0|0|1|1| +———+-+-+-+-+-+-+
Я хочу суммировать все 1 во всех строках и получить total.ie приведенная выше сумма данных данных всех столбцов должна быть 12 (так как 12 чисел из 1 во всех рядах объединены)
Я попытался сделать это:
var count = 0 DF.foreach( x => { count = count + Integer.parseInt(x.getAs[String](«a»)) + Integer.parseInt(x.getAs[String](«b»)) + Integer.parseInt(x.getAs[String](«c»)) + Integer.parseInt(x.getAs[String](«d»)) + Integer.parseInt(x.getAs[String](«e»)) + Integer.parseInt(x.getAs[String](«f»)) })
Когда я запускаю вышеуказанный код, значение count все равно равно zero. Я думаю, что это имеет какое-то отношение к запуску приложения в кластере. Итак, объявление переменной и добавление к ней не работает для меня, поскольку я должен запускать свое приложение в кластере. Я также попытался объявить статическую переменную в отдельном классе java и добавить к ней – это дает мне тот же результат.
Насколько мне известно, должен быть простой способ добиться этого, используя встроенные функции, доступные в библиотеках искры/скала.
Что было бы эффективным способом достижения этого? Любая помощь будет оценена по достоинству.
Спасибо.
PS: Я использую Spark 1.6.
Лучший ответ:
Вы можете суммировать значения столбцов во-первых, которые возвращают один кадр данных Row sum s, тогда вы можете преобразовать этот ряд в Seq и суммировать значения вверх:
val sum_cols = df.columns.tail.map(x => sum(col(x))) df.agg(sum_cols.head, sum_cols.tail: _*).first.toSeq.asInstanceOf[Seq[Long]].sum // res9: Long = 12 df.agg(sum_cols.head, sum_cols.tail: _*).show +——+——+——+——+——+——+ |sum(a)|sum(b)|sum(c)|sum(d)|sum(e)|sum(f)| +——+——+——+——+——+——+ | 2| 2| 2| 1| 2| 3| +——+——+——+——+——+——+ Ответ №1
Вот альтернативный подход:
сначала позвольте подготовить агрегирующую функцию:
scala> val f = df.drop(«address»).columns.map(col).reduce((c1, c2) => c1 + c2) f: org.apache.spark.sql.Column = (((((a + b) + c) + d) + e) + f)
получить сумму как DataFrame:
scala> df.agg(sum(f).alias(«total»)).show +——+ |total| +——+ | 12| +——+
получить сумму как Long число:
scala> df.agg(sum(f)).first.getLong(0) res39: Long = 12