Сначала проверьте ваше соединение.
Затем, если вы хотите получить точное значение из базы данных, тогда вы должны написать:
$username = $_POST['username'];
$password = $_POST['password'];
$result = mysql_query("SELECT * FROM Users WHERE UserName =`$usernam`");
Или вы хотите получить LIKE
, то вы должны написать:
$result = mysql_query("SELECT * FROM Users WHERE UserName LIKE '%$username%'");
Spark 2.1 +
Вы можете использовать функцию from_json
:
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("k", StringType, true), StructField("v", DoubleType, true)
))
df.withColumn("jsonData", from_json($"jsonData", schema))
Spark 1.6 +
Вы можете используйте get_json_object
, который берет столбец и путь:
import org.apache.spark.sql.functions.get_json_object
val exprs = Seq("k", "v").map(
c => get_json_object($"jsonData", s"$$.$c").alias(c))
df.select($"*" +: exprs: _*)
и извлекает поля для отдельных строк, которые могут быть добавлены к ожидаемым типам.
Аргумент path
выраженный с использованием точечного синтаксиса, с ведущим $.
, обозначающим корень документа (поскольку используемый выше код использует интерполяцию строк $
, поэтому $$.
).
Spark & lt; = 1.5:
Возможно ли это?
blockquote>Насколько я знаю, это невозможно. Вы можете попробовать что-то похожее на это:
val df = sc.parallelize(Seq( ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"), ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2") )).toDF("key", "jsonData", "blobData")
Я предполагаю, что поле
blob
не может быть представлено в JSON. В противном случае вы откажитесь от разделения и соединения:import org.apache.spark.sql.Row val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey") val jsons = sqlContext.read.json(df.drop("blobData").map{ case Row(key: String, json: String) => s"""{"key": "$key", "jsonData": $json}""" }) val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey") parsed.printSchema // root // |-- jsonData: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: double (nullable = true) // |-- key: long (nullable = true) // |-- blobData: string (nullable = true)
Альтернативный (более дешевый, хотя и более сложный) подход - использовать UDF для разбора JSON и вывода столбца
struct
илиmap
. Например, что-то вроде этого:import net.liftweb.json.parse case class KV(k: String, v: Int) val parseJson = udf((s: String) => { implicit val formats = net.liftweb.json.DefaultFormats parse(s).extract[KV] }) val parsed = df.withColumn("parsedJSON", parseJson($"jsonData")) parsed.show // +---+--------------------+------------------+----------+ // |key| jsonData| blobData|parsedJSON| // +---+--------------------+------------------+----------+ // | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]| // | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]| // +---+--------------------+------------------+----------+ parsed.printSchema // root // |-- key: string (nullable = true) // |-- jsonData: string (nullable = true) // |-- blobData: string (nullable = true) // |-- parsedJSON: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: integer (nullable = false)
Функция from_json
- это именно то, что вы ищете. Ваш код будет выглядеть примерно так:
val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()
//You can define whatever struct type that your json states
val schema = StructType(Seq(
StructField("key", StringType, true),
StructField("value", DoubleType, true)
))
df.withColumn("jsonData", from_json(col("jsonData"), schema))
, лежащий в основе JSON String, является
"{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}";
Ниже приведен скрипт для фильтрации JSON и загрузки необходимых данных в Cassandra.
sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2")
.write.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name"))
.mode(SaveMode.Append)
.save()