Neo4j 从数据库 和 CSV 中导入数据

一、示例数据库

NorthWind 下载地址 Neo4j 数据集案例中的 NorthWind 数据库文件

选择 Postgresql 版本的数据:northwind.postgre.sql

数据库结构:

主要用到表 customers、suppliers、products、employees 和 categories。

二、建立图模型

从关系模型中导出图模型时,我们应牢记以下准则:

  • A row is a node
  • A table name is a label name

在此数据集中,以下图形模型用作第一次迭代:

图模型与关系模型有何不同?

  1. 没有null
    1. 在关系版本中,为了跟踪员工层次结构,如果他们不向任何人报告,我们在“ReportsTo”列中有一个空条目。 在图形版本中,我们只是没有定义关系。
    2. 现有值条目(属性)不存在。
  2. 它更详细地描述了这种关系。 例如,我们知道员工销售订单而不是在订单和员工表之间建立外键关系。 我们还可以根据需要选择添加关于该关系的更多元数据。
  3. 它通常会更加规范化。 例如,地址已经在几个表中被非规范化,但是在图模型的未来版本中,我们可能选择以自己的权限创建地址节点。

三、导入 Postgresql 数据库

为了可以远程访问,方便操作数据,和可以导入SQL脚本,需要修改配置文件。

修改后重启 systemctl restart postgresql.service

把 sql 脚本放到 /root/Documents 下:

对于postgresql导入sql文件的简单命令为:

psql -U username -W -d schemaname -f file.sql

四、导出 CSV 文件

通过定义的图模型,我们一共用到5张表,分别为 employees、orders、products、suppliers 和 categories。 把这几张表导出为CSV文件,注意,CSV 文件要带字段名称头的行,第一行为字段列。

现在我们知道了我们想要的图形,我们需要从 PostgreSQL 中提取数据,以便我们可以将其创建为图形。 最简单的方法是以 CSV 格式导出相应的表格。 PostgreSQL Copy 命令允许我们执行 SQL 查询并将结果写入 CSV 文件,例如 使用 psql -d northwind < export_csv.sql

export_csv.sql文件:

COPY (SELECT * FROM customers) TO '/tmp/customers.csv' WITH CSV header;
COPY (SELECT * FROM suppliers) TO '/tmp/suppliers.csv' WITH CSV header;
COPY (SELECT * FROM products)  TO '/tmp/products.csv' WITH CSV header;
COPY (SELECT * FROM employees) TO '/tmp/employees.csv' WITH CSV header;
COPY (SELECT * FROM categories) TO '/tmp/categories.csv' WITH CSV header;

COPY (SELECT * FROM orders
      LEFT OUTER JOIN order_details ON order_details.OrderID = orders.OrderID) TO '/tmp/orders.csv' WITH CSV header;

五、使用 Cypher 导入 CSV 文件到 Neo4J

在我们从 Postgresql 导出数据后,我们将使用 Cypher 的 LOAD CSV 命令将 CSV 文件的内容转换为图形结构。

高能预警:默认情况下,neo4j 需要将待导入的 csv 文件放置在安装目录下的 import 文件夹下,使用 file:/// 形式的协议去访问待导入文件。

如果嫌弃麻烦,也可以更改配置项 dbms.directories.import 值为磁盘文件夹绝对路径,便可以通过此文件夹路径来访问。

具体可参考 neo4j load csv 使用注意项指南

第一步:创建节点

import_csv.cypher 文件:

// Create customers
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///customers.csv" AS row
CREATE (:Customer {companyName: row.CompanyName, customerID: row.CustomerID, fax: row.Fax, phone: row.Phone});

// Create products
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///products.csv" AS row
CREATE (:Product {productName: row.ProductName, productID: row.ProductID, unitPrice: toFloat(row.UnitPrice)});

// Create suppliers
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///suppliers.csv" AS row
CREATE (:Supplier {companyName: row.CompanyName, supplierID: row.SupplierID});

// Create employees
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///employees.csv" AS row
CREATE (:Employee {employeeID:row.EmployeeID,  firstName: row.FirstName, lastName: row.LastName, title: row.Title});

// Create categories
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///categories.csv" AS row
CREATE (:Category {categoryID: row.CategoryID, categoryName: row.CategoryName, description: row.Description});

USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///orders.csv" AS row
MERGE (order:Order {orderID: row.OrderID}) ON CREATE SET order.shipName =  row.ShipName;

接下来,我们将在刚刚创建的节点上创建索引,以确保在下一步创建关系时快速查找。

CREATE INDEX ON :Product(productID);
CREATE INDEX ON :Product(productName);
CREATE INDEX ON :Category(categoryID);
CREATE INDEX ON :Employee(employeeID);
CREATE INDEX ON :Supplier(supplierID);
CREATE INDEX ON :Customer(customerID);
CREATE INDEX ON :Customer(customerName);
CREATE CONSTRAINT ON (o:Order) ASSERT o.orderID IS UNIQUE;

由于索引是在插入节点之后创建的,因此它们的数量是异步发生的,因此我们使用 schema await(shell命令)来阻塞它们,直到它们被填充为止。

schema await

初始节点和索引到位后,我们现在可以创建订单与产品和员工的关系。

USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///orders.csv" AS row
MATCH (order:Order {orderID: row.OrderID})
MATCH (product:Product {productID: row.ProductID})
MERGE (order)-[pu:PRODUCT]->(product)
ON CREATE SET pu.unitPrice = toFloat(row.UnitPrice), pu.quantity = toFloat(row.Quantity);

USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///orders.csv" AS row
MATCH (order:Order {orderID: row.OrderID})
MATCH (employee:Employee {employeeID: row.EmployeeID})
MERGE (employee)-[:SOLD]->(order);

USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///orders.csv" AS row
MATCH (order:Order {orderID: row.OrderID})
MATCH (customer:Customer {customerID: row.CustomerID})
MERGE (customer)-[:PURCHASED]->(order);

接下来,创建产品,供应商和类别之间的关系:

USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///products.csv" AS row
MATCH (product:Product {productID: row.ProductID})
MATCH (supplier:Supplier {supplierID: row.SupplierID})
MERGE (supplier)-[:SUPPLIES]->(product);

USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///products.csv" AS row
MATCH (product:Product {productID: row.ProductID})
MATCH (category:Category {categoryID: row.CategoryID})
MERGE (product)-[:PART_OF]->(category);

最后,我们将在员工之间创建 REPORTS_TO 关系来表示报告结构:

USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///employees.csv" AS row
MATCH (employee:Employee {employeeID: row.EmployeeID})
MATCH (manager:Employee {employeeID: row.ReportsTo})
MERGE (employee)-[:REPORTS_TO]->(manager);

也可以使用一次运行整个脚本 bin/neo4j-shell -path northwind.db -file import_csv.cypher

生成的图形应如下所示:

我们现在可以查询结果图。

六、查询图

Which Employee had the Highest Cross-Selling Count of 'Chocolade' and Which Product?

MATCH (choc:Product {productName:'Chocolade'})<-[:PRODUCT]-(:Order)<-[:SOLD]-(employee),
      (employee)-[:SOLD]->(o2)-[:PRODUCT]->(other:Product)
RETURN employee.employeeID, other.productName, count(distinct o2) as count
ORDER BY count DESC
LIMIT 5;

看起来1号员工很忙!

How are Employees Organized? Who Reports to Whom?

MATCH path = (e:Employee)<-[:REPORTS_TO]-(sub)
RETURN e.employeeID AS manager, sub.employeeID AS employee;

请注意,5号员工有人向他们报告,但也向2号员工报告。

Which Employees Report to Each Other Indirectly?

MATCH path = (e:Employee)<-[:REPORTS_TO*]-(sub)
WITH e, sub, [person in NODES(path) | person.employeeID][1..-1] AS path
RETURN e.employeeID AS manager, sub.employeeID AS employee, CASE WHEN LENGTH(path) = 0 THEN "Direct Report" ELSE path END AS via
ORDER BY LENGTH(path);

How Many Orders were Made by Each Part of the Hierarchy?

MATCH (e:Employee)
OPTIONAL MATCH (e)<-[:REPORTS_TO*0..]-(sub)-[:SOLD]->(order)
RETURN e.employeeID, [x IN COLLECT(DISTINCT sub.employeeID) WHERE x <> e.employeeID] AS reports, COUNT(distinct order) AS totalOrders
ORDER BY totalOrders DESC;

七、更新图

现在,如果我们想要更新图形数据,我们必须首先找到相关信息,然后更新或扩展图形结构。

Janet is now reporting to Steven

我们需要首先找到 Steven,以及 Janet 和她的 REPORTS_TO 关系。 然后我们删除现有关系并为 Steven 创建一个新关系。

MATCH (mgr:Employee {EmployeeID:5})
MATCH (emp:Employee {EmployeeID:3})-[rel:REPORTS_TO]->()
DELETE rel
CREATE (emp)-[:REPORTS_TO]->(mgr)
RETURN *;

只需更新组织层次结构的一部分,即可进行单一关系更改。 所有后续查询将立即使用新结构。

八、拓展阅读

From SQL to Cypher – A hands-on Guide

https://neo4j.com/developer/guide-sql-to-cypher/

importing CSV Data into Neo4j

https://neo4j.com/developer/guide-import-csv/

Tool: SQL to Neo4j Import

https://neo4j.com/blog/loading-sql-neo4j-like-magic/

九、API 导入简单示例

@Test
void testInsert()
{
    Driver driver = GraphDatabase.driver("bolt://192.190.10.170:7687", AuthTokens.basic("neo4j", "neo4j"));
    try (Session session = driver.session())
    {
        int index = 1;
        while (index < 100000000)
        {
            try
            {
                String cql = "CREATE (book:Book {page:$page, title:$title, author:$author})";
                session.run(cql, parameters("page", index, "title", "Learning " + index, "author", "Liu " + index));
                if (index % 10000 == 0)
                {
                    System.out.println("done book " + index);
                }
                index++;
            }
            catch (Exception e)
            {
                System.out.println("error happened at index " + index);
            }
        }
    }
}
如果觉得这对你有用,请随意赞赏,给与作者支持
评论 1
最新评论
#1 楼 污妖神尊 2018-08-08

这种方式造数据比较方便!