網(wǎng)站域名可以更改嗎seo引擎優(yōu)化公司
neo4j 圖表數(shù)據(jù)導入到 TuGraph
- 代碼文件
- 說明
- 后文
前言:近期在引入阿里的 TuGraph 圖數(shù)據(jù)庫,需要將 原 neo4j 數(shù)據(jù)導入到新的 tugraph 數(shù)據(jù)庫中。預期走csv文件導入導出,但因為格式和數(shù)據(jù)庫設(shè)計問題,操作起來比較麻煩(可能是個人沒有發(fā)現(xiàn)其他比較方便的辦法),因此寫了一個 python 腳本進行數(shù)據(jù)導入操作。
使用:python3,TuGraph 4.5.1
遇到的問題:tugraph 的節(jié)點需要一個主鍵,這個只能自行指定。
支持:指定節(jié)點,指定邊。自動創(chuàng)建不存在的節(jié)點/邊,數(shù)據(jù)導入批量導入節(jié)點,單條導入邊(試過批量的,tugraph好像不支持 官網(wǎng)的 CALL db.upsertEdge 我的版本也還沒實現(xiàn))。
導入圖示:
代碼文件
# import time
import json
from typing import Dict, List, castclass GraphConnector():db_type: str = "tugraph"driver: str = "bolt"dialect: str = "cypher"batch_size: int = 100# 指定節(jié)點的主鍵node_pro_key: dict = dict({'Ren':'zjhm','Aj':'ajbh','Che':'rowkey','Hh':'rowkey','Sj':'dhhm'})#指定需要導入的邊specified_relation = ['ajgx','th','tfj','sysj','sycl']#指定需要導入的節(jié)點specified_node = ['Ren','Aj','Che','Sj','Hh']def __init__(self, driver, graph):"""Initialize the connector with a Neo4j driver."""self._driver = driverself._schema = Noneself._graph = graphself._session = None@classmethoddef from_uri_db(cls, host: str, port: int, user: str, pwd: str, db_name: str, db_type: str) -> "GraphConnector":"""Create a new TuGraphConnector from host, port, user, pwd, db_name."""try:from neo4j import GraphDatabasecls.db_type = db_typedb_url = f"{cls.driver}://{host}:{str(port)}"driver = GraphDatabase.driver(db_url, auth=(user, pwd))driver.verify_connectivity()return cast(GraphConnector, cls(driver=driver, graph=db_name))except ImportError as err:raise ImportError("neo4j package is not installed, please install it with ""`pip install neo4j`") from errdef create_graph_new(self, graph_name: str) -> bool:"""Create a new graph in the database if it doesn't already exist."""try:with self._driver.session() as session:graph_list = session.run("CALL dbms.graph.listGraphs()").data()exists = any(item["graph_name"] == graph_name for item in graph_list)if not exists:session.run(f"CALL dbms.graph.createGraph('{graph_name}', '', 2048)")except Exception as e:raise Exception(f"Failed to create graph '{graph_name}': {str(e)}") from ereturn not existsdef create_vertex_labels(self, json_data):try:with self._driver.session(database=self._graph) as session:# graph_list = session.run(f"CALL db.createVertexLabelByJson({json_data})").data()session.run("CALL db.createVertexLabelByJson($json_data)",json_data=json_data)except Exception as e:raise Exception(f"Failed to create vertex_labels ") from e# 批量更新節(jié)點,沒有就新增,有就更新def batch_update_node(self, json_data):try:with self._driver.session(database=self._graph) as session:# graph_list = session.run(f"CALL db.createVertexLabelByJson({json_data})").data()session.upsertVertex("CALL db.upsertVertex($json_data)",json_data=json_data)except Exception as e:raise Exception(f"Failed to create vertex_labels ") from e# 批量更新關(guān)系,沒有就新增,有就更新def batch_update_edge(self, json_data):try:with self._driver.session(database=self._graph) as session:# graph_list = session.run(f"CALL db.createVertexLabelByJson({json_data})").data()session.upsertVertex("CALL db.upsertEdge($json_data)",json_data=json_data)except Exception as e:raise Exception(f"Failed to create vertex_labels ") from edef create_edge_labels(self, json_data):try:with self._driver.session(database=self._graph) as session:# graph_list = session.run(f"CALL db.createVertexLabelByJson({json_data})").data()session.run("CALL db.createEdgeLabelByJson($json_data)",json_data=json_data)except Exception as e:raise Exception(f"Failed to create vertex_labels ") from edef run(self, query: str, fetch: str = "all") -> List:"""Run query."""with self._driver.session(database=self._graph) as session:try:result = session.run(query)return list(result)except Exception as e:raise Exception(f"Query execution failed: {e}\nQuery: {query}") from edef check_label_exists(self, label: str, label_type: str) -> bool:with self._driver.session(database=self._graph) as session:# Run the query to get vertex labelsif label_type == "node":raw_vertex_labels = session.run("CALL db.vertexLabels()").data()vertex_labels = [table_name["label"] for table_name in raw_vertex_labels]if label in vertex_labels:return Trueelse:# Run the query to get edge labelsraw_edge_labels = session.run("CALL db.edgeLabels()").data()edge_labels = [table_name["label"] for table_name in raw_edge_labels]if label in edge_labels:return Truereturn False# 獲取節(jié)點或邊的結(jié)構(gòu)def get_columns(self, table_name: str, table_type: str = "vertex") -> List[Dict]:"""Retrieve the column for a specified vertex or edge table in the graph db."""with self._driver.session(database=self._graph) as session:data = []result = Noneif table_type == "vertex":result = session.run(f"CALL db.getVertexSchema('{table_name}')").data()else:result = session.run(f"CALL db.getEdgeSchema('{table_name}')").data()schema_info = json.loads(result[0]["schema"])for prop in schema_info.get("properties", []):prop_dict = {"name": prop["name"],"type": prop["type"],"default_expression": "","is_in_primary_key": bool("primary" in schema_infoand prop["name"] == schema_info["primary"]),"comment": prop["name"],}data.append(prop_dict)return datadef close(self):"""Close the Neo4j driver."""self._driver.close()# {"name": "id", "type": "STRING", "optional": False},
# {"name": "name", "type": "STRING", "optional": False, "index": True},
# {"name": "num", "type": "STRING", "optional": False, "unique": True},
# {"name": "desc", "type": "STRING", "optional": True}
# 構(gòu)建節(jié)點json語句用于tugraph創(chuàng)建節(jié)點
def bulid_node_json(node_name:str,pro_key:str ,node_properties):vertex_label_json = {"label": node_name,"primary": pro_key,"type": "VERTEX","detach_property": True,"properties": []}for node_property in node_properties:proper_info = {"name": node_property[0], "type": "STRING", "optional": False}vertex_label_json['properties'].append(proper_info)return json.dumps(vertex_label_json)def bulid_edge_json(edge_name:str,edge_properties,start_node_key,end_node_key):edge_label_json = {"label": edge_name,"type": "EDGE","detach_property": True,"constraints": [],"properties": []}edge_label_json['constraints'].append([edge_properties[0][1][0],edge_properties[0][2][0]])# 這是在邊屬性中存儲節(jié)點的主鍵(不需要也可以)# edge_label_json['properties'].append({"name": start_node_key if start_node_key != end_node_key else start_node_key+'1', "type": "STRING", "optional": False})# edge_label_json['properties'].append({"name": end_node_key if start_node_key != end_node_key else start_node_key+'2', "type": "STRING", "optional": False})for edge_property in edge_properties:proper_info = {"name": edge_property[0], "type": "STRING", "optional": True}edge_label_json['properties'].append(proper_info)return json.dumps(edge_label_json)def neo4jNode2Tugrapg(connector,tugraphConn):query = """CALL db.labels() YIELD labelRETURN label;"""print("Executing query:", query)results_nodes = connector.run(query)print(f"所有的節(jié)點:{results_nodes}")print("指定的節(jié)點:",connector.specified_node)for node in results_nodes:#獲取節(jié)點結(jié)構(gòu)query = f"""MATCH (n:{node[0]})UNWIND keys(n) AS keyRETURN DISTINCT key"""node_properties = connector.run(query)if node[0] not in connector.specified_node and len(connector.specified_node) != 0:continueprint(f"當前 neo4j 節(jié)點 {node[0]} , roperties : {node_properties}!!")if tugraphConn.check_label_exists(node[0],"node"):print(node[0],"節(jié)點已經(jīng)存在!")else:print(node[0],"節(jié)點不存在,需要新建!")node_json = bulid_node_json(node[0],connector.node_pro_key[node[0]],node_properties)# 新建不存在的節(jié)點tugraphConn.create_vertex_labels(node_json)# neo4j中查詢出當前節(jié)點標簽下所有節(jié)點queryNode = f"MATCH (n:{node[0]}) RETURN n"# 構(gòu)建插入語句同步節(jié)點synchronize_node(node[0],connector.run(queryNode),node_properties,tugraphConn)# node_name 當前節(jié)點標簽名
# node_result neo4j中查詢出的節(jié)點結(jié)果
# tugraphConn tugraph連接器
# 構(gòu)建新增節(jié)點語句并tugraphConn 執(zhí)行,一次執(zhí)行300條
# CREATE (:node1 {id: "2", name: "李四", num: "001", desc: "李四的信息"}),
# (:node1 {id: "3", name: "李四", num: "001", desc: "李四的信息"});
def synchronize_node(node_name:str,node_result,node_properties,tugraphConn):# 構(gòu)建Cypher查詢語句print(f"同步 {node_name} 節(jié)點共 {len(node_result)} 記錄,請等待執(zhí)行完成...")create_node_cypher_parts = []count = 0skip_num = 0for node in node_result:# print("aa",aa)item = node[0]._propertiesproperties_list = []is_skip = Falsefor key in node_properties:# 如果節(jié)點結(jié)構(gòu)與當前節(jié)點屬性結(jié)構(gòu)不一致,則跳過當前節(jié)點if key[0] not in item.keys():skip_num += 1is_skip = Truebreakif is_skip:continuefor key, value in item.items():properties_list.append(f"{key}: '{value}'")# if isinstance(value, str):# # 如果是字符串,則添加引號# properties_list.append(f"{key}: '{value}'")# else:# # 否則直接添加# properties_list.append(f"{key}: {value}")cypher_query = f"(:{node_name} {{{', '.join(properties_list)}}})"create_node_cypher_parts.append(cypher_query)count += 1# 每300個節(jié)點執(zhí)行一次TuGraph數(shù)據(jù)庫操作if count % 300 == 0:create_node_cypher = f"CREATE {', '.join(create_node_cypher_parts)}"# print(create_node_cypher) # 打印生成的Cypher查詢語句以便調(diào)試tugraphConn.run(create_node_cypher)create_node_cypher_parts = [] # 清空列表以準備下一批節(jié)點# 處理剩余的節(jié)點if create_node_cypher_parts:create_node_cypher = f"CREATE {', '.join(create_node_cypher_parts)}"# print(create_node_cypher) # 打印生成的Cypher查詢語句以便調(diào)試tugraphConn.run(create_node_cypher)print(f"所有 {node_name} 節(jié)點同步完成,共 {len(node_result)} 條記錄,不符合要求 {skip_num} 條;成功導入 {count} 條!")# 導入邊
def neo4jEdge2Tugrapg(connector,tugraphConn):query = """CALL db.relationshipTypes() YIELD relationshipTypeRETURN relationshipType;"""print("Executing query:", query)results_dege = connector.run(query)print(f"所有的關(guān)系:{results_dege}")print(f"指定的關(guān)系:{connector.specified_relation}")for edge in results_dege:if edge[0] not in connector.specified_relation and len(connector.specified_relation) != 0:continue# 獲取關(guān)系結(jié)構(gòu)query = f"""MATCH (n1)-[r:{edge[0]}]->(n2) UNWIND keys(r) AS key RETURN DISTINCT key, labels(n1) AS start_node_labels, labels(n2) AS end_node_labels"""edge_properties = connector.run(query)start_node = edge_properties[0][1][0]end_node = edge_properties[0][2][0]if start_node not in connector.specified_node or end_node not in connector.specified_node:print(f"{edge[0]}關(guān)系中存在不符合要求的節(jié)點,跳過!")continueif tugraphConn.check_label_exists(edge[0],"edge"):print(edge[0],"關(guān)系已經(jīng)存在!")else:print(edge[0],"關(guān)系不存在,需要新建!")#獲取節(jié)點結(jié)構(gòu)node_json = bulid_edge_json(edge[0],edge_properties, connector.node_pro_key[start_node], connector.node_pro_key[end_node])# 新建不存在的節(jié)點tugraphConn.create_edge_labels(node_json)# neo4j中查詢出當前節(jié)點標簽下所有節(jié)點queryNode = f"MATCH (n1)-[r:{edge[0]}]->(n2) RETURN n1,r,n2;"results = connector.run(queryNode)# 構(gòu)建插入語句同步節(jié)點synchronize_edge(edge[0],results,start_node,end_node,tugraphConn)def synchronize_edge(edge_name:str,edge_results,start_node_name,end_node_name,tugraphConn):# 構(gòu)建Cypher查詢語句print(f"同步 {edge_name} 關(guān)系共 {len(edge_results)} 記錄,請等待執(zhí)行完成...")create_node_cypher_parts = []count = 0skip_num = 0for edge in edge_results:properties_list = []for gx in edge:if hasattr(gx, 'type'):if list(gx.start_node.labels)[0] == start_node_name and list(gx.end_node.labels)[0] == end_node_name:start_node = gx.start_nodeend_node = gx.end_nodestart_pro_key = tugraphConn.node_pro_key[start_node_name]end_pro_key = tugraphConn.node_pro_key[end_node_name]start_pro_val = start_node[start_pro_key]end_pro_val = end_node[end_pro_key]# 創(chuàng)建一個字典來存儲所有屬性csv_map = {# start_pro_key if start_node_name != end_node_name else start_pro_key+'1': start_pro_val,# end_pro_key if start_node_name != end_node_name else end_pro_key+'2': end_pro_val}csv_map.update(gx)# 將屬性字典轉(zhuǎn)換為 JSON 風格的字符串# 構(gòu)造關(guān)系屬性字符串rel_props_list = [f"{key}: '{value}'" for key, value in csv_map.items()]rel_props_str = "{ " + ", ".join(rel_props_list) + " }"# todo 批量操作存儲屬性的# str1 = f"{{startId:'{start_pro_val}', endId:'{end_pro_val}', relProps:{rel_props_str}}}"# properties_list.append(str1)# create_node_cypher_parts.append(str1)create_edge_cypher = f"""MATCH (n1:{start_node_name} {{{start_pro_key}: '{start_pro_val}'}}),(n2:{end_node_name} {{{end_pro_key}: '{end_pro_val}'}})CREATE (n1)-[:{edge_name} {rel_props_str}]->(n2);"""# print(f"執(zhí)行新增關(guān)系[{edge_name}]的cypher:{create_edge_cypher}")tugraphConn.run(create_edge_cypher)count += 1else:break# 批量操作 (tugraph不支持)# if count % 3 == 0 and create_node_cypher_parts:# map = {## }# queue_cypher = f"""# UNWIND [{', '.join(create_node_cypher_parts)}] AS relData# MATCH (a:{start_node_name} {{{tugraphConn.node_pro_key[start_node_name]}: relData.startId}}), (b:{end_node_name} {{{tugraphConn.node_pro_key[end_node_name]}: relData.endId}})# MERGE (a)-[r:{edge_name}]->(b)# SET r += relData.relProps# RETURN r;# """# print(f"執(zhí)行新增關(guān)系[{edge_name}]的cypher:{queue_cypher}")# # tugraphConn.run(queue_cypher)# create_node_cypher_parts = []# if create_node_cypher_parts:# queue_cypher = f"""# UNWIND [{', '.join(create_node_cypher_parts)}] AS relData# MATCH (a:{start_node_name} {{{tugraphConn.node_pro_key[start_node_name]}: relData.startId}}), (b:{end_node_name} {{{tugraphConn.node_pro_key[end_node_name]}: relData.endId}})# MERGE (a)-[r:{edge_name}]->(b)# SET r += relData.relProps# RETURN r;# """# print(f"執(zhí)行新增關(guān)系[{edge_name}]的cypher:{queue_cypher}")# tugraphConn.run(queue_cypher)print(f"所有 {edge_name} 節(jié)點同步完成,共 {len(edge_results)} 條記錄,不符合要求 {skip_num} 條;成功導入 {count} 條!")# 創(chuàng)建連接器
def conn_tugraph():# 配置連接信息host = "1111"port = 111user = "111"password = "111"db_name = "test121"db_type = "tugraph"connector = GraphConnector.from_uri_db(host, port, user, password, db_name, db_type)return connector
def conn_neo4j():# 配置連接信息host = "11111"port = 111user = "111"password = "111111"db_name = "111"db_type = "neo4j"connector = GraphConnector.from_uri_db(host, port, user, password, db_name, db_type)return connectordef main():neo4jConn = conn_neo4j()tugraphConn = conn_tugraph()print("Successfully connected to Graph!")# 創(chuàng)建TuGraph新圖庫 - 連接時選中,可以手動創(chuàng)建,或者在初始化方法中創(chuàng)建tugraphConn.create_graph_new("test121")# 導入節(jié)點neo4jNode2Tugrapg(neo4jConn,tugraphConn)# 導入邊neo4jEdge2Tugrapg(neo4jConn,tugraphConn)# get_relation_tocsv(connector)# 關(guān)閉連接neo4jConn.close()tugraphConn.close()print("Connection closed.")if __name__ == "__main__":main()
說明
只是用Python簡單寫了一個可以執(zhí)行導入操作的腳本,歡迎指正和優(yōu)化。邊的導入比較慢(單條導入)。
有兩種優(yōu)化思路:
一、Cypher 語句:
UNWIND [{startId:‘11’, endId:‘21’, relProps:{ hphm: ‘33’, sj: ‘44’ }},
{startId:‘22’, endId:‘23’, relProps:{ hphm: ‘44’, sj: ‘20080102’ }},
{startId:‘33’, endId:‘24’, relProps:{ hphm: '55, sj: ‘20120110’ }}] AS relData
MATCH (a:Ren {zjhm: relData.startId}), (b:Che {rowkey: relData.endId})
MERGE (a)-[r:sycl]->(b)
SET r += relData.relProps
RETURN r;
二、
https://tugraph-db.readthedocs.io/zh-cn/latest/development_guide.html Tugraph 官網(wǎng)的批量操作
CALL db.upsertEdge(‘edge1’,{type:‘node1’,key:‘node1_id’}, {type:‘node2’,key:‘node2_id’}, [{node1_id:1,node2_id:2,score:10},{node1_id:3,node2_id:4,score:20}])
代碼里面留了
但 我的版本好像都不支持!!!
后文
歡迎討論