Введение в графы и Neo4j. Обработка графов в Spark

Illustration of a person with a scarf working on a laptop with abstract molecular or network structures in the background.
Обновлено: | Опубликовано:

К графу надо обращаться «ваше сиятельство»

Музыка для сопровождения статьи https://www.youtube.com/watch?v=3zegtH-acXE

Табличка – наше всё

Программисты автоматизируют мир. Начав с автоматизации подсчета денег, где табличные записи естественны, программисты стали заталкивать в таблички всё, до чего их допустили. И это работало. В прошлом веке. И это хорошо т.к. из глубин вековой давности мы получили SQL – языка запросов к табличке. SQL хватает почти всегда – он простой, логичный и все его знают. А когда не хватает, его расширяют. И дальше мы будем работать с расширениями и диалектами SQL.

В реальном мире всё несколько сложнее. В табличках представлять большие и сложные зависимости сложно и не эффективно по ресурсам (дисковое пространство,процессор, и главный ресурс – время). А какие данные не идеальны для табличек? Может, это лишние данные?

Важный граф

В математике есть понятие граф – набор вершин (узлы) и связей этих вершин (ребро).

Минимальный граф – это две вершины и одно ребро между ними. Например этим минимальным графом мы можем нарисовать семейную пару: М ---- Ж. То есть граф – это естественные связи вещей и цепочки этих связей.

Например, записать структуру папок на компьютере в табличку. Для рубистов штук 5 гемов есть и только одно правильное решение – ltree + индексы. Кроме правильного есть идеальное решение: LDAP. А структуру организации в табличку? Тоже можно, но дурная затея, LDAP и тут выручает. И то и то деревья – есть корень и ветки, самая толстая ветка – ствол :-)

Не видел в жизни ни одного графа

Да даже мой любимый git это граф. Выполните в любом своём репозитории:

git log --graph --abbrev-commit --decorate --date=relative --all

И вы увидите дерево изменений.

Еще один граф – это родственные связи в Звездных Войнах (как вы помните, там только Император был сиротой): https://nplus1.ru/news/2016/02/10/star-wars-graph.

А еще на графах интернет держится. Например, алгоритм Дейкстры как раз решает задачу как бы быстрейший/кратчайший путь найти.

Когда у нас простой граф мы можем его нарисовать в табличках.

Графы кстати бывают разные. Граф, в котором связи направленные и не создают петель называется ... направленный ацикличный граф https://en.wikipedia.org/wiki/Directed_acyclic_graph. Этот тип графа важен для понимания логики работы Spark-приложений, так же он помогает формализировать бизнес процессы и ускорять их.

Если б на почте РФ знали о таких графах, то почтой бы чаще пользовались и не надо было в отделениях торговать сигаретами и водкой. И раз уж мы вспомнили Apache Spark, то RDD представляет из себя дерево разных версий RDD (если вы не поняли о чем речь, то читаем публикации создателя Spark ).

Зависимости в приложении тоже граф, например ruby-erd может для руби приложения нарисовать.

Neo4j и простые запросы

А если у нас тысячи узлов? То тоже можем использовать таблички, только устанем. Вот тут на помощь приходит специализированный инструмент – графовые базы данных. Их много, но мы выберем Neo4j – она сама простая для начала и имеет прекрасный веб-интерфейс с визуализацией, а красивая картинка иногда информативнее тысячи слов.

Есть графовые "базы", использующие табличное хранение данных, но использующие графовую абстракцию – эта порнография работает только в специфичных условиях (в этих же условиях платят хорошо, потому запоминаем cassandra + titan или Spark).

Потихоньку начнем изучать Neo4j с данными, доступными каждому.

Сначала запустим Neo4j, что очень легко, как и всё в Java мире:

  1. Качаете архив neo4j-community-3.0.3 с официального сайта http://neo4j.com/
  2. Распаковываете tar -xzvf ./neo4j-community-3.0.3-unix.tar.gz
  3. Запускаете ./bin/neo4j start

А вообще, не надо. Останавливайте: ./bin/neo4j stop. Сделаем современно и молодежно.

docker run --publish=7474:7474 \
--publish=7687:7687 \
--volume=$HOME/neo4j/data:/data:rw,z \
--env=NEO4J_AUTH=none  neo4j:3.0

А дальше самое интересное, причина почему стоит начинать с Neo4j: откройте в браузере http://localhost:7474/. Всё, вы – властелин графовой базы данных.

Интерактивная консоль и визуализация упростит старт. А еще мы отключили аутентификацию --env=NEO4J_AUTH=none. Для наших экспериментов она не нужна, а для боевых систем надо всё параноидально закрывать. Хотя это и смешно – закрывать и Docker :)

Так же там есть документация по возможностям системы и статистика по используемым ресурсам.

Создадим минимальный граф.

В данной статье главный диалект языка SQL зовётся “Кефир”: https://neo4j.com/developer/cypher-query-language/ :-)

CREATE (c:M {id: 'Male’'})-[:rel {weight:500} ]->(b:F {id: 'Female'});   

И посмотрим на результат:

MATCH (n) RETURN n LIMIT 1000;

Граф у нас направленный, даже в наш безумный век мужчина направлен к женщине. Любопытно, что направлением при поиске можно пренебречь:

MATCH p=(n)-->(b) return p

A можно не пренебрегать:

MATCH (n)-[p]->(b) WHERE startNode(p).id='Male' return b, n; // вернется наш граф
MATCH (n)-[p]->(b) WHERE startNode(p).id='Female' return b, n; // а тут уже пустой

Еще мы можем выбрать самые сильные отношения, чего размениваться на короткие интрижки:

MATCH (n)-[p]->(b) WHERE p.weight > 1000 return b, n /// таких нет, но вы можете снизить порог фильтра
Заметки по Кефировским запросам для игр на досуге.

Создаем ноды, если нет:

MERGE (c:M {id:'Male1'});

Создаем отношения, если их нет, и ноды если их нет:

MERGE (p:twivi13 {id: 'twivi13'})
MERGE (n:Real {id:'Real'})
CREATE UNIQUE (p)-[r:rel {weidth: 0}]-(n) return r

Обновляем вес отношений, если они есть:

MATCH (n)-[p]->(b) WHERE b.id='Male' AND n.id='Female'
SET p.weight = p.weight + 7
RETURN p

Смотрим в Neo4j только сотню самых сильных отношений:

MATCH (n)-[p]-(b)
RETURN p Order by p.weight DESC limit 100

Ищем вершины с более чем 2 связями:

MATCH (n) WHERE size((n)--())>2 RETURN n

Ну и если захотим удалить:

MATCH (n) DETACH DELETE n

Все классно, всё работает, всё просто. Но пользы ни какой.

Пишем приложения с Neo4j, Spark и данными Твиттера

Надо залить другие данные. И всплывает вопрос о том, как приложение вообще может общаться с Neo4j?

Есть поддержка формата для хипстеров HTTP API с JSON, в нём есть даже загрузка данных пачками. Так что с этой стороны все правильно, но зря.

Следующий вариант: протокол Bolt. Bolt бинарный, шифруемый и изобретен специально для Neo4j.

Каждое приложение должно придумать свой протокол, а то протоколов не хватает.

Есть уже драйвер для java и коннектор для Spark.

И раз уж есть такая возможность, то впихиваем все модные инструменты без разбору, особенно если в них нет нужды. И в модных инструментах выберем еще и модные названия: напишем приложение для Spark Streaming, которое будет читать твиттер, строить граф и обновлять связи в графе.

Есть прогноз что Spark Streaming через 5 лет будет не актуален, но сейчас за него платят. И он еще и не стриминг ни разу :)

Твиттер – доступный источник большого количества лишней информации. Можно даже считать его графом человеческой глупости. А вот задался бы кто целью проследить развитие идей – был бы наглядный аргумент за классическое образование (у меня его нет, о чем данная статья аж кричит).

Делаем модно:

Первым делом скачиваем исходники Spark на свою локальную машинку.

git clone https://github.com/apache/spark
git checkout branch-1.6

И в одном из примеров узнаем как подключиться к твиттеру и как его читать. И других примеров там много – полезно просмотреть, пролистать, чтобы знать, что Spark уже умеет.

Если это первое ваше знакомство с твиттером с этой стороны, то ключики и токены брать тут https://apps.twitter.com. После модификации примера получиться может так:

package org.apache.spark.examples.streaming

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
// да да - тут эклипс бы отрефакторил всё, но я на виме
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.SparkConf
import org.neo4j.driver.v1._

object Twittoneo {
  def main(args: Array[String]) {
    StreamingExamples.setStreamingLogLevels()

    // а сюда я сначала настоящие ключи записал :)
    System.setProperty("twitter4j.oauth.consumerKey", "tFwPvX5s")
    System.setProperty("twitter4j.oauth.consumerSecret", "KQvGuGColS12k6Mer45jnsxI")
    System.setProperty("twitter4j.oauth.accessToken", "113404896-O323jjsnI6mi2o6radP")
    System.setProperty("twitter4j.oauth.accessTokenSecret", "pkyl20ZjhtbtMnX")

    val sparkConf = new SparkConf().setAppName("Twitter2Neo4j")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val stream = TwitterUtils.createStream(ssc, None)

    val actData = stream.map(status => {
      // extract from twitter json only interesting fields
      val author:String = status.getUser.getScreenName.replaceAll("_", "UNDERSCORE")
      val ments:Array[String] = status.getUserMentionEntities.map(x => {
                                           x.getScreenName.replaceAll("_", "UNDERSCORE")
                                       })
      (author, ments)
    })


    actData.foreachRDD(rdd => {
      if(!rdd.isEmpty())    {
        println("Open connection to Neo4jdb")
        val driver = GraphDatabase.driver( "bolt://localhost",
                                           AuthTokens.basic( "neo4j", "neo4j" ))
        try{
            // collect called for execute all logic on driver
            // http://spark.apache.org/docs/latest/cluster-overview.html
            rdd.collect().foreach(x =>{
              x._2.foreach( target =>{
                  val session = driver.session()
                  // create relation if not exist
                  // merge (p:twivi13 {id: 'twivi13'})
                  // merge (n:Real {id:'Real'})
                  // create unique (p)-[r:rel {weidth: 0}]-(n) return r
                  //
                  // pregix 'twi' and replacement _ to UNDESCORE require because Neo4j has many restrictions for label
                  val relation = "MERGE (p:twi"+ x._1+" {id:'twi"+x._1+"'}) MERGE (d:twi"+ target+" {id: 'twi" + target + "'}) CREATE UNIQUE (p)-[r:rel {weight: 0}]-(d) RETURN 1"
                  println(relation)
                  val result1 = session.run(relation)
                  result1.consume().counters().nodesCreated();
                  session.close()
              })
            })
        } finally {
            driver.close()
            println("Close Neo4j connection")
        }

        driver.close()
      }
    })



    ssc.start()
    ssc.awaitTermination()
  }
}

Обратите внимание – neo4j-spark-connector нам сейчас не нужен, вместо него мы возьмем neo4j-java-driver, который отвечает только за связь java-приложения и Neo4j , без выгрузок и загрузок в естественные форматы RDD или GraphFrame. Но в след. статье они нам понадобятся для настоящей работы носящей гордое имя Big Data инженер.

Компилируем и ставим в локальный Maven репозиторий библиотеку neo4j-java-driver:

git clone git@github.com:neo4j/neo4j-java-driver.git
cd neo4j-java-driver
mvn clean package install

Кладём наш код в src/main/scala/org/apache/spark/examples/streaming/Twitter2neo4j.scala. Находясь в папке examples добавляем в ./pom.xml:

   <dependency>
         <groupId>org.neo4j.driver</groupId>
         <artifactId>neo4j-java-driver</artifactId>
         <version>1.1-SNAPSHOT</version>
   </dependency>

Дальше компилируем примеры вместе с нашим: вызывайте mvn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean package.

Кстати, на моём курсе первым делом учимся maven, gradle и sbt. Это основа основ. Важнее только bash.

У нас получается jar файлик с которым дальше будем работать. Вместо компиляции Spark опять же будем использовать модный Docker.

Обратите внимание на версию Spark – 1.6.1. Важно что б ваш пакет и Spark были одной версией, а то Spark так быстро развивается , что иногда даже слишком быстро.

Помним о безопасности. И еще один интересный момент за Docker – образ для нас собрал непонятно кто, а докер работает от рута (хипстеры писали , что с них взять).Теоретически, это огромная дыра в безопасности. Я-то смело выполняю данные действия - в наипоследнейшей Fedora с включенным selinux, ограждающим дырявый докер. А пользователей убунты не жалко. Кстати, супер крутая и модная версия докера называется chroot.

Запустим контейнер (коробочку, ведерко), экспортируя текущую папку в /shared c правом на только чтение(ro) и перемаркируя selinux контекст в svirt_sandbox_file_t.

Контекст SELinux для песочницы останется и после остановки контейнера. Так же экспортируем нами скомпилированную библиотеку для neo4j-java-driver.

А вот был бы у нас свой центральный репозиторий, например Nexus – было бы проще.

Или как вариант можно поправить pom.xml и собирать со всеми необходимыми зависимостями. У нас получится два контейнера которые должны общаться, поэтому пойдём по пути наименьшего сопротивления – Spark-контейнер будет использовать сетевой стек Neo4j контейнера, разделяя с ним ip адрес.

Для этого вызываем docker ps | grep -i neo4j | awk '{print $1}' и смотрим id контейнера.

docker run -it -v `pwd`:/shared:ro,z \
-v /path/to/neo4j-java-driver/driver/target:/neo4jadapter:ro,z \
--net=container:ca294ee9e614 \
sequenceiq/spark:1.6.0 bash

Вот мы оказались в контейнере. У нас есть хадуп и спарк. Но хадуп мы трогать не будем – он нам не мешает, а Spark запустим так, что б он выполнялся локально(--master local[4]):

spark-submit --jars /neo4jadapter/neo4j-java-driver-1.1-SNAPSHOT.jar \
--packages "org.apache.spark:spark-streaming-twitter_2.10:1.6.1" \
--class org.apache.spark.examples.streaming.Twittoneo \
--master local[4] \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--verbous \
/shared/target/spark-examples_2.10-1.6.1.jar

Ждем минут хотя бы 10 и смотрим результат. Но чем больше данных соберем, тем интереснее.

Красота!!!

Если хотите краше, динамичнее и может даже в реальном времени, то на mkdev есть курс Екатерины Шпак. Там учат d3.js и прочим хитростям красивого фронтенда. Вообще в Big Data половина дела – это презентовать ваши достижения среднестатистическому менеджеру, у которого хоть и классическое образование, но без картинки не поймёт.

В данном примере есть один важный момент – не используется суперсила Spark. Ведь в Spark мы можем построить сразу готовый граф, аггрегировать все сообщения и только потом заливать в Neo4j. Этим можно значительно повысить количество информации перерабатываемое системой. Но кого это волнует в обучающем примере. Да даже в реальности это мало кого волнует :)

Выводы и прогнозы на следующую часть

Полученная база и та выборка, что мы делаем, не практична. Следующий шаг это найти например самых влиятельных пользователей (найти, дать им деньги и пусть рекламируют наш товар). Или отслеживать упоминание вашего бренда и в зависимости от ситуации (ругают, хвалят, кто отзывается) пытаться увеличить прибыль. Это очевидные примеры использования, но в славной стране Германия есть закон запрещающий хранить данные пользователей позволяющие явно идентифицировать оных.

Так вот, используя большой массив данных из разных источников можно пользователя узнавать и по косвенным уликам. Так что тот факт что у меня параноя совсем не отменяет слежки за каждым из нас. И приятнее быть еще и тем, кто следит, чем только тем, за кем следят. И графы тут очень помогают, о чем поговорим в следующей части. Кстати - заодно посмотрим решения, когда Neo4j не хватает, ведь при всех своих достоинствах Neo4j не масштабируется. Авторы Neo4j с этим не согласны, но кто им верит.

Кстати особенность всех nosql решений – база строится исходя из того что и как вы будете запрашивать. Когда кричат, что что-то schemaless – значит либо врут, либо невозможно работать :)

Дополнительное чтение

Subscribe to our Newsletter

Let us send you the best of what we've discovered in DevOps, Cloud and Kubernetes, as well us occasional event announcements.

We are also preparing some ways to learn together: weekly challenges, free courses and more. Subscribe now to be the first to get those.