admin管理员组文章数量:1026989
一个简单的基于知识图谱的影视剧推荐系统
背景
这是去年九月份在研究知识图谱与推荐时做的一个Demo项目,源自于在github上找到一个关于汽车行业的知识图谱开源项目。我主要对它进行了一些改造,使之变成了一个基于知识图谱的影视剧推荐系统。
环境
python3、flask前端框架、图数据库neo4j(3.3.1)
操作系统为windows10
项目框架
把上面的汽车项目clone下来后,整个的项目结构如下图所示
里面有两个项目版本,第一次验收和第二次验收,两者主要区别是用的数据库不同,前者用的是mysql,后者用的是neo4j。我主要是基于第二次验收进行改造的。打开第二次验收的项目,里面的结构如下图所示
流程分析
下面,我们就原始项目的工作流程,进行一步一步的分析,因为只有这样,才能完成对其的改造。
数据的读取和插入
首先我们肯定需要把数据插入到neo4j里,那么上来我们就得启动neo4j,打开cmd,输入以下命令
neo4j console
然后如若cmd显示下面的消息,neo4j就启动完成了
最后一行显示的可用地址http://localhost:7474就是我们访问neo4j的地址,打开浏览器,把这个地址拷到地址栏里,敲下回车,就会看到neo4j的控制台界面 ,如下图所示
数据库启动完事之后,就可以打开项目里kg\kg.py文件了,在这里面,主要代码如下所示
def data_init(self):# 连接图数据库print('开始数据预处理')self.graph = Graph('http://localhost:7474', user="neo4j", password="szc")self.selector = NodeSelector(self.graph)self.graph.delete_all()def insert_datas(self):print('开始插入数据')with open('../data/tuples/three_tuples_2.txt', 'r', encoding='utf-8') as f:lines, num = f.readlines(), -1for line in lines:num += 1if num % 500 == 0:print('当前处理进度:{}/{}'.format(lines.index(line), len(lines)))line = line.strip().split(' ')if len(line) != 3:print('insert_datas错误:', line)continueself.insert_one_data(line)def insert_one_data(self, line):if '' in line:print('insert_one_data错误', line)returnstart = self.look_and_create(line[0])for name in self.get_items(line[2]):end = self.look_and_create(name)r = Relationship(start, line[1], end, name=line[1])self.graph.create(r) # 当存在时不会创建新的# 查找节点是否不存,不存在就创建一个def look_and_create(self, name):end = self.graph.find_one(label="car_industry", property_key="name", property_value=name)if end == None:end = Node('car_industry', name=name)return enddef get_items(self, line):if '{' not in line and '}' not in line:return [line]# 检查if '{' not in line or '}' not in line:print('get_items Error', line)lines = [w[1:-1] for w in re.findall('{.*?}', line)]return lines
最上面的data_init()函数,是用来连接neo4j数据库的,传入数据库地址、用户名、密码就可以了。然后调用graph.delete_all()函数,在插入数据前,先对原来的数据进行清空,这一步要根据自己的业务场景酌情考虑,是否保留。
然后是insert_datas()函数,这个函数就是读取txt文件,遍历每一行,对每一行调用insert_one_data()函数,进行每一行的解析,结点和关系的创建。根据代码可以发现,每一行的数据都是“起点 关系 终点”的形式,比如“安阳 位置 豫北”,就表示实体安阳和实体豫北的关系是位置,而且,顺序是安阳-->位置-->豫北。
调用insert_one_data()函数时,会先查询数据库里是否有这一个同名结点,根据结果决定是复用已有的还是建一个新的,这个过程对应函数look_and_create()。
在函数look_and_create()里,“car_industry”是数据库的标签(我理解是对应Mysql里每个数据库的名字,要用到哪个就调用命令use database some_database),然后find_one()函数里,property_name的值对应创建结点时Node的构造函数的参数名name,property_value就是Node的构造函数的name参数值,也就是实体的名字。拿我的故乡——安阳市实体为例,它在neo4j里的存储结构就可以理解为{property_name: "name", property_value: "安阳"}。
最后的get_items()函数就是实体的合法性检验,不做过多解读。
运行服务
数据全部插入数据库中后,就可以运行我们的服务了,文件对应run_server.py,里面代码如下
if __name__ == '__main__':args=get_args()print('\nhttp_host:{},http_port:{}'.format('localhost',args.http_port))app.run(debug=True, host='210.41.97.169', port=8090)
其实关键就是一句app.run()函数,把里面的Ip和端口换成自己就可以
处理页面请求
我们的业务逻辑是:在浏览器输入url和参数,获取相关结果。
其中,处理我们的参数的过程,对应文件views.py,里面的主要代码如下
@app.route('/KnowGraph/v2',methods=["POST"])
def look_up():kg=KnowGraph(get_args())client_params=request.get_json(force=True)server_param={}if client_params['method'] == 'entry_to_entry':kg.lookup_entry2entry(client_params,server_param)elif client_params['method'] == 'entry_to_property':kg.lookup_entry2property(client_params,server_param)elif client_params['method'] == 'entry':kg.lookup_entry(client_params,server_param)elif client_params['method'] == 'statistics':kg.lookup_statistics(client_params,server_param)elif client_params['method'] == 'live':params={'success':'true'}server_param['result']=params server_param['id']=client_params['id']server_param['jsonrpc']=client_params['jsonrpc']server_param['method']=client_params['method']print(server_param)return json.dumps(server_param, ensure_ascii=False).encode("utf-8")
可以看到,/KnowGraph/v2路径的post方法会路由到look_up函数里,里面根据参数method的值,调用kg对象的不同函数,执行不同的查询逻辑。
但是,我们在浏览器输入路径和参数然后敲下回车后,是要获取数据库信息,显然是对应的get方法。而且,关于向flask模板传递数据的路由也没写上,所以这个文件我们要进行大改。
数据查询
方才说到,views.py文件里会根据参数method的值的不同,调用kg对象的不同函数,来获取不同的结果。
而kg对象所属的KnowledgeGraph类,在文件modules.py里。以最简单也是最基本的对实体查询为例,我们看看其是怎么实现的,这对应lookup_entry函数,代码如下
def lookup_entry(self,client_params,server_param):#支持设定网络查找的深度start_time = time.time()params=client_params["params"]edges=set()self.lookup_entry_deep(edges,params,0)if len(edges)==0:server_param['result']={"success":'false'}else: server_param['result']={'edges':[list(i) for i in edges],"success":'true'}print('本次查找三元组的数量为:{},耗时:{}s'.format(len(edges),time.time()-start_time))
除了计时外,主要将客户端参数里的params取出来,里面包含要查找的实体名和查找深度,然后调用lookup_entry_deep函数进行查找,结果保存在edges集合里,最后将edges集合的每一项做为列表的列表的每一项,存储在server_params的'results'项中的'edges'里,进行返回。
下面,我们就看一下lookup_entry_deep函数的实现,代码如下
def lookup_entry_deep(self,edges,params,deep):#当前查找深度不得等于要求的深度if deep >= params['deep']:return#正向查找result1=self.graph.data("match (s)-[r]->(e) where s.name='{}' return s.name,r.name,e.name".format(params['name']))result2=self.graph.data("match (e)<-[r]-(s) where e.name='{}' return s.name,r.name,e.name".format(params['name']))if len(result1)==0 and len(result2)==0:returnfor item in result1:edges.add((item['s.name'],item['r.name'],item['e.name']))if item['s.name'] != item['e.name']:#避免出现:双面胶:中文名:双面胶的死循环params['name']=item['e.name']self.lookup_entry_deep(edges,params.copy(),deep+1)for item in result2:edges.add((item['s.name'],item['r.name'],item['e.name']))if item['s.name'] != item['e.name']:#避免出现:双面胶:中文名:双面胶的死循环params['name']=item['e.name']self.lookup_entry_deep(edges,params.copy(),deep+1)
首先,如果深度超标,就直接返回。然后先后针对params里的name项,也就是要查找的实体名,在数据库里进行正向和逆向的查询,然后把每一项做为元组保存在edges集合里,并递归调用这个函数,同时深度+1
改造
现有的流程就如上文所言,接下来,我们针对影视剧推荐的业务场景,对其进行改造。
假设有个用户观看了电视剧《上将XXX》,我们可以根据导演、演员、上映地、语种、类型标签等为其推荐他可能感兴趣的影视剧。
数据格式
我们的文件都保存在wiki目录里,均为txt文件,里面一行行的都是json,其中一行内容如下
{..... "title": "上将XXX", "wikiData": {....."wikiInfo": {"country": "中国大陆", "language": "普通话", "directors": ["安澜"], "actors": ["宋春丽", "王伍福", "张秋歌", "范明", "刘劲", "陶慧敏", "侯勇"], ....}, ...."wikiTags": ["电视剧", "历史", "战争", "军旅", "革命", "动作", "热血", "激昂", "24-36", "36-45", "45-55", "55-70", "上星剧", "传记"]}
}
里面有用的信息格式化后就像上面显示的,导演演员之类的。
接下来,我们就可以根据解析项目时理出的流程,进行改造
数据读取和插入
这对应kg.py文件,首先定义我们的目录路径
data_dir = "C:\\Users\\songzeceng\\Desktop\\wiki\\"
然后遍历这个目录下的文件,对每个文件进行读取和解析,代码如下
def insert_data_from_txt(self, file_path):try:with open(file=file_path, mode="r", encoding="utf-8") as f:for line in f.readlines():item = json.loads(line)if 'title' not in item.keys():continuetitle = self.look_and_create(item['title'])if 'wikiData' not in item.keys():continuewikiData = item['wikiData']if 'wikiDesc' in wikiData.keys():wikiDesc = self.look_and_create(wikiData['wikiDesc'])self.create_sub_graph(entity1=title, entity2=wikiDesc, relation="desc")if 'wikiTags' in wikiData.keys():for tag in wikiData['wikiTags']:tag = self.look_and_create(tag)self.create_sub_graph(entity1=title, entity2=tag, relation="tag")wikiInfo = wikiData['wikiInfo']if 'country' in wikiInfo.keys():country = self.look_and_create(wikiInfo['country'])self.create_sub_graph(entity1=title, entity2=country, relation="country")if 'language' in wikiInfo.keys():language = self.look_and_create(wikiInfo['language'])self.create_sub_graph(entity1=title, entity2=language, relation="language")if 'actors' in wikiInfo.keys():for actor in wikiInfo['actors']:actor = self.look_and_create(actor)self.create_sub_graph(entity1=title, entity2=actor, relation="actor")if 'directors' in wikiInfo.keys():for director in wikiInfo['directors']:actor = self.look_and_create(director)self.create_sub_graph(entity1=title, entity2=actor, relation="director")print(file_path, "读取完毕")except Exception as e:print("文件" + file_path + "读取异常:" + str(e))pass
看着长,其实就是解析每一项,先查找或创建实体,对应函数look_and_create。由于我的py2neo版本和原项目里的不一样,所以对这个函数进行了改写,代码如下
def look_and_create(self, name):matcher = NodeMatcher(self.graph)end = matcher.match("car_industry", name=name).first()if end == None:end = Node('car_industry', name=name)return end
然后进行实体关系的创建,对应函数create_sub_graph,代码如下
def create_sub_graph(self, entity1, relation, entity2):r = Relationship(entity1, relation, entity2, name=relation)self.graph.create(r)
整个kg文件代码如下所示
# coding:utf-8
'''
Created on 2018年1月26日@author: qiujiahao@email:997018209@qq'''
import sys
import re
import ossys.path.append('..')
from conf import get_args
from py2neo import Node, Relationship, Graph, NodeMatcher
import pandas as pd
import jsonimport osdata_dir = "C:\\Users\\songzeceng\\Desktop\\wiki\\"class data(object):def __init__(self):self.args = get_args()self.data_process()def data_process(self):# 初始化操 # 插入数据self.data_init()print("数据预处理完毕")def data_init(self):# 连接图数据库print('开始数据预处理')self.graph = Graph('http://localhost:7474', user="neo4j", password="szc")# self.graph.delete_all()file_names = os.listdir(data_dir)for file_name in file_names:self.insert_data_from_txt(data_dir + file_name)def insert_data_from_txt(self, file_path):try:with open(file=file_path, mode="r", encoding="utf-8") as f:for line in f.readlines():item = json.loads(line)if 'title' not in item.keys():continuetitle = self.look_and_create(item['title'])# id = self.look_and_create(item['id'])## self.create_sub_graph(entity1=title, entity2=id, relation="title")if 'wikiData' not in item.keys():continuewikiData = item['wikiData']if 'wikiDesc' in wikiData.keys():wikiDesc = self.look_and_create(wikiData['wikiDesc'])self.create_sub_graph(entity1=title, entity2=wikiDesc, relation="desc")if 'wikiTags' in wikiData.keys():for tag in wikiData['wikiTags']:tag = self.look_and_create(tag)self.create_sub_graph(entity1=title, entity2=tag, relation="tag")wikiInfo = wikiData['wikiInfo']if 'country' in wikiInfo.keys():country = self.look_and_create(wikiInfo['country'])self.create_sub_graph(entity1=title, entity2=country, relation="country")if 'language' in wikiInfo.keys():language = self.look_and_create(wikiInfo['language'])self.create_sub_graph(entity1=title, entity2=language, relation="language")if 'actors' in wikiInfo.keys():for actor in wikiInfo['actors']:actor = self.look_and_create(actor)self.create_sub_graph(entity1=title, entity2=actor, relation="actor")if 'directors' in wikiInfo.keys():for director in wikiInfo['directors']:actor = self.look_and_create(director)self.create_sub_graph(entity1=title, entity2=actor, relation="director")print(file_path, "读取完毕")except Exception as e:print("文件" + file_path + "读取异常:" + str(e))passdef create_sub_graph(self, entity1, relation, entity2):r = Relationship(entity1, relation, entity2, name=relation)self.graph.create(r)def look_and_create(self, name):matcher = NodeMatcher(self.graph)end = matcher.match("car_industry", name=name).first()if end == None:end = Node('car_industry', name=name)return endif __name__ == '__main__':data = data()
运行之,命令行输出如下图所示
数据不规范,很多文件读不了,不管了,反正就是个demo。然后neo4j数据库里,取25条数据,结果如下图所示
运行服务
这里直接把run_server.py里的ip和端口改成自己的就行了
处理请求
这一步对应views.py。
首先我们要把/KnowGraph/v2路径的get请求拦截,所以要加一个注解函数,如下所示
@app.route('/KnowGraph/v2', methods=["GET"])
def getInfoFromServer():pass
然后就实现这个函数即可,首先处理请求参数,我们的请求完整url是这样的
http://localhost:8090/KnowGraph/v2?method=entry&jsonrpc=2.0&id=1¶ms=entry=上将许世友-deep=2
参数比较多,而且很多是固定的,比如jsonrpc、id等,因此我将其简化为
http://localhost:8090/KnowGraph/v2?name=上将许世友
然后在getInfoFromServer()函数里,把默认参数都加上即可,代码如下
def handle_args(originArgs):if 'name' not in originArgs.keys():return Noneargs = {}for item in originArgs:key = itemvalue = originArgs[key]if key == "params":kvs = str(value).split("-")kv_dic = {}for item in kvs:kv = item.split("=")k = kv[0]v = kv[1]if v.isnumeric():kv_dic[k] = int(v)else:kv_dic[k] = vargs[key] = kv_dicelse:if value.isnumeric():args[key] = int(value)else:args[key] = valueif 'params' not in args.keys():args['params'] = {'name': args['name']}args.pop('name')args['params']['name'] = args['params']['name'].replace('\'', '\\\'')if 'method' not in args.keys():args['method'] = 'entry'if 'deep' not in args['params'].keys():args['params']['deep'] = 2if 'jsonrpc' not in args.keys():args['jsonrpc'] = 2.0if 'id' not in args.keys():args['id'] = 1return args
其实主要就是遍历和填充操作
参数处理完后,我们就可以根据参数里的method字段,来进行不同的查询操作了,然后从server_param的result字段里获取结果,交给前端,进行页面的渲染。故而,可以写出getInfoFromServer()函数代码如下
@app.route('/KnowGraph/v2', methods=["GET"])
def getInfoFromServer():args = handle_args(request.args.to_dict())kg = KnowGraph(args)client_params = argsserver_param = {}if client_params['method'] == 'entry':kg.lookup_entry(client_params, server_param)server_param['id'] = client_params['id']server_param['jsonrpc'] = client_params['jsonrpc']server_param['method'] = client_params['method']print("server_param:\n", server_param)global mydataif 'result' in server_param.keys():mydata = server_param['result']else:mydata = '{}'print("mydata:\n", mydata)return render_template("index.html")
这里我们只处理对实体的查询,因为我们的输入就是用户观看的一个影视剧的名字。
渲染界面时,会通过/KnowGraph/data路径获取数据,因此要将其拦截,代码如下
@app.route("/KnowGraph/data")
def data():print("data:", data)return mydata
整个的views.py文件如下所示
# coding:utf-8
'''
Created on 2018年1月9日@author: qiujiahao@email:997018209@qq'''from flask import jsonify
from conf import *
from flask import Flask
from flask import request, render_template
from server.app import app
import tensorflow as tf
from server.module import KnowGraph
import jsonmydata = ""# http://210.41.97.89:8090/KnowGraph/v2?name=胜利之路
# http://113.54.234.209:8090/KnowGraph/v2?name=孤战
# http://localhost:8090/KnowGraph/v2?method=entry_to_property&jsonrpc=2.0&id=1¶ms=entry=水冶-property=位置
@app.route('/KnowGraph/v2', methods=["GET"])
def getInfoFromServer():args = handle_args(request.args.to_dict())kg = KnowGraph(args)client_params = argsserver_param = {}if client_params['method'] == 'entry':kg.lookup_entry(client_params, server_param)server_param['id'] = client_params['id']server_param['jsonrpc'] = client_params['jsonrpc']server_param['method'] = client_params['method']print("server_param:\n", server_param)global mydataif 'result' in server_param.keys():mydata = server_param['result']else:mydata = '{}'print("mydata:\n", mydata)return render_template("index.html")def handle_args(originArgs):if 'name' not in originArgs.keys():return Noneargs = {}for item in originArgs:key = itemvalue = originArgs[key]if key == "params":kvs = str(value).split("-")kv_dic = {}for item in kvs:kv = item.split("=")k = kv[0]v = kv[1]if v.isnumeric():kv_dic[k] = int(v)else:kv_dic[k] = vargs[key] = kv_dicelse:if value.isnumeric():args[key] = int(value)else:args[key] = valueif 'params' not in args.keys():args['params'] = {'name': args['name']}args.pop('name')args['params']['name'] = args['params']['name'].replace('\'', '\\\'')if 'method' not in args.keys():args['method'] = 'entry'if 'deep' not in args['params'].keys():args['params']['deep'] = 2if 'jsonrpc' not in args.keys():args['jsonrpc'] = 2.0if 'id' not in args.keys():args['id'] = 1return args@app.route("/KnowGraph/data")
def data():print("data:", data)return mydata
数据库查询
最后,我们把精力投放在module.py中的数据库查询和结果分析中。
为了便于查看,我们把结果放在json文件里,因此,查询结果在内存中用字典存储,每一次查询前,先把字典清空,再进行查询,然后根据有无结果,执行不同的解析逻辑。因此,可以写出lookup_entry函数如下所示
def lookup_entry(self, client_params, server_param):# 支持设定网络查找的深度start_time = time.time()params = client_params["params"]edges = set()sim_dict.clear()self.lookup_entry_deep(edges, params, 0)if len(edges) == 0:server_param['success'] = 'false'else:self.handleResult(edges, server_param, start_time)
对实体的查询都放在lookup_entry_deep()函数里。一般来说,我们的深度只有两层, 第一层是我们查询用户影视剧的各个属性,比如上将许世友的导演,第二层我们根据每个属性,去查找这个属性对应的实体,比如查询上将许世友的导演,还主拍过哪些影视剧。显然,第一层为正向查找,第二层则为逆向查找。
在查找时,为了避免向用户推荐他刚看过的影视剧,我们还要对结果进行去重。比方说,我们针对上将XXX进行查找,当查到上将XXX的导演为安澜,然后对安澜进行逆向查找时,如果发现安澜只导演过上将XXX这一部作品,那我们就没必要也不应该,把上将许世友加入到推荐列表里。
针对上面的没有查出别的实体的情况,我把这一返回结果定义为'nothing else';如果什么也没查到,就是'nothing got';如果深度超标,就是'deep out';一切正常,则为'ok'。
我们先进行双向查询,代码如下
result1 = self.graph.run(cypher='''match (s)-[r]->(e) where s.name='{}'return s.name,r.name,e.name'''.format(params['name'])).data()result2 = self.graph.run(cypher='''match (e)<-[r]-(s) where e.name='{}' return s.name,r.name,e.name '''.format(params['name'])).data()
然后对两个结果进行判空,如果长度都为0,就返回'nothing got'
if len(result1) == 0 and len(result2) == 0:return 'nothing got'
如果result2(也就是逆向查找的结果)只有一项,这一项中的s.name(也就是影视剧名)还是输入的实体名,e.name(也就是属性名)还是原来的属性名,那就直接返回'nothing else'
if len(result2) == 1:item = result2[0]if origin_tv_name is not None and origin_property_name is not None:if origin_property_name == item['e.name'] and origin_tv_name == item['s.name']:return 'nothing else'
这里的origin_tv_name和origin_property_name都是lookup_entry_deep函数的参数之一,默认为None
然后我们先遍历正向查询结果result1,把里面的属性值(e.name)、属性名(r.name)和原始影视剧(s.name)连接起来,作为三元组保存到edges集合里。
for item in result1:tv_name = item['s.name']property_name = item['e.name']has_result = Falseif tv_name != property_name: # 避免出现:双面胶:中文名:双面胶的死循环if oldName != property_name:params['name'] = property_namehas_result = self.lookup_entry_deep(edges, params.copy(), deep + 1,origin_tv_name=tv_name,origin_property_name=property_name)
oldName是本次查询的实体名,此处为了避免出现死循环,加了个判断,其实我们这个场景里,这个判断肯定是成立的。
接下来,我们就分析逆向查找的结果。如果查出了新的影视剧,就先根据新影视剧和属性的关系,得出这一关系的相似度。然后,再把新的影视剧、相同属性名、相似度以或累加、或新建的方式加入相似字典和edges集合里,代码如下
for item in result2:tv_name = item['s.name']property_name = item['e.name']relation_name = item['r.name']if tv_name != origin_tv_name:score = get_sim_score_accroding_to_relation(relation_name)if tv_name not in sim_dict.keys():sim_dict[tv_name] = {relation_name: [property_name],"similarity": score}else:item_dict = sim_dict[tv_name]if relation_name in item_dict.keys() and \property_name in item_dict.values():continueif relation_name in item_dict.keys():item_dict[relation_name].append(property_name)else:item_dict[relation_name] = [property_name]item_dict["similarity"] += scoreedges.add((tv_name, relation_name, property_name))
其中,根据关系获得相似度的函数get_sim_score_accroding_to_relation()的代码如下所示
def get_sim_score_accroding_to_relation(relation_name):if relation_name in ['actor', 'director', 'tag']:return 1.0elif relation_name in ['language', 'country']:return 0.5return 0.0
完整的lookup_entry_deep()函数如下所示
# 限制深度的查找def lookup_entry_deep(self, edges, params, deep, origin_tv_name=None, origin_property_name=None):# 当前查找深度不得等于要求的深度if deep >= params['deep']:return 'deep out'# 正向查找oldName = str(params['name'])if oldName.__contains__("\'") and not oldName.__contains__("\\\'"):params['name'] = oldName.replace("\'", "\\\'")result1 = self.graph.run(cypher='''match (s)-[r]->(e) where s.name='{}'return s.name,r.name,e.name'''.format(params['name'])).data()result2 = self.graph.run(cypher='''match (e)<-[r]-(s) where e.name='{}' return s.name,r.name,e.name '''.format(params['name'])).data()if len(result1) == 0 and len(result2) == 0:return 'nothing got'if len(result2) == 1:item = result2[0]if origin_tv_name is not None and origin_property_name is not None:if origin_property_name == item['e.name'] and origin_tv_name == item['s.name']:return 'nothing else'for item in result1:tv_name = item['s.name']property_name = item['e.name']if tv_name != property_name: # 避免出现:双面胶:中文名:双面胶的死循环if oldName != property_name:params['name'] = property_namehas_result = self.lookup_entry_deep(edges, params.copy(), deep + 1,origin_tv_name=tv_name,origin_property_name=property_name)for item in result2:has_result = Falsetv_name = item['s.name']property_name = item['e.name']relation_name = item['r.name']if tv_name != origin_tv_name:score = get_sim_score_accroding_to_relation(relation_name)if tv_name not in sim_dict.keys():sim_dict[tv_name] = {relation_name: [property_name],"similarity": score}else:item_dict = sim_dict[tv_name]if relation_name in item_dict.keys() and \property_name in item_dict.values():continueif relation_name in item_dict.keys():item_dict[relation_name].append(property_name)else:item_dict[relation_name] = [property_name]item_dict["similarity"] += scoreedges.add((tv_name, relation_name, property_name))return 'ok'
当查询完成后,如果有结果,我们就会到handle_result()函数里处理结果,进行返回或输出。主要是根据相似度进行从高到低的排序,然后取出前20个,写入json文件,这部分代码如下所示
def handleResult(self, edges, server_param, start_time):....sorted_sim_list = sorted(sim_dict.items(), key=lambda x: x[1]['similarity'], reverse=True)ret = {}for i in range(len(sorted_sim_list)):if i >= 20:breakret[sorted_sim_list[i][0]] = sorted_sim_list[i][1]mydata = json.dumps(ret, ensure_ascii=False)print('Json路径是:%s' % (fname))self.clear_and_write_file(fname, mydata)def clear_and_write_file(self, fname, mydata):with open(fname, 'w', encoding='utf-8') as f:f.write(str(""))with open(fname, 'a', encoding='utf-8') as f:f.write(str(mydata))
除此之外,我还将结果存放在了server_param里,用于向前端界面输出结果,这部分代码如下所示
ret = []for result in edges:ret.append({"source": result[0],"target": result[2],"relation": result[1],"label": "relation"})print("ret:", ret)server_param['result'] = {"edges": ret}server_param['success'] = 'true'print('本次查找三元组的数量为:{},耗时:{}s'.format(len(ret), time.time() - start_time))
完整的结果处理函数的代码如下
def handleResult(self, edges, server_param, start_time):ret = []for result in edges:ret.append({"source": result[0],"target": result[2],"relation": result[1],"label": "relation"})print("ret:", ret)server_param['result'] = {"edges": ret}server_param['success'] = 'true'print('本次查找三元组的数量为:{},耗时:{}s'.format(len(ret), time.time() - start_time))sorted_sim_list = sorted(sim_dict.items(), key=lambda x: x[1]['similarity'], reverse=True)ret = {}for i in range(len(sorted_sim_list)):if i >= 20:breakret[sorted_sim_list[i][0]] = sorted_sim_list[i][1]mydata = json.dumps(ret, ensure_ascii=False)print('Json路径是:%s' % (fname))self.clear_and_write_file(fname, mydata)
运行结果
首先启动服务,运行run_server.py,然后在浏览器地址栏里,输入如下url(XXX为输入的名字):
http://210.41.97.169:8090/KnowGraph/v2?name=XXX
然后页面输出如下
结果非常庞杂,我们再看看json文件里的前20个的输出,结果如下
{"XXX元帅": {"actor": ["侯勇","刘劲"],"similarity": 14.0,"language": ["普通话"],"country": ["中国大陆"],"tag": ["传记","上星剧","55-70","45-55","36-45","24-36","热血","革命","战争","历史","电视剧"]},"BBB": {"actor": ["刘劲","王伍福"],"similarity": 14.0,"language": ["普通话"],"country": ["中国大陆"],"tag": ["传记","上星剧","55-70","45-55","36-45","24-36","热血","革命","战争","历史","电视剧"]},"长征大会师": {"actor": ["刘劲","王伍福"],"similarity": 14.0,"language": ["普通话"],"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","革命","战争","历史","电视剧"]},"战将": {"language": ["普通话"],"similarity": 13.0,"country": ["中国大陆"],"tag": ["传记","上星剧","55-70","45-55","36-45","24-36","热血","动作","革命","战争","历史","电视剧"]},"炮神": {"language": ["普通话"],"similarity": 13.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","动作","革命","军旅","战争","历史","电视剧"]},"独立纵队": {"language": ["普通话"],"similarity": 13.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","动作","革命","战争","历史","电视剧"]},"女子军魂": {"language": ["普通话"],"similarity": 13.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","革命","军旅","战争","历史","电视剧"]},"热血军旗": {"actor": ["侯勇"],"similarity": 12.0,"language": ["普通话"],"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","热血","动作","革命","战争","历史","电视剧"]},"擒狼": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","动作","革命","战争","历史","电视剧"]},"信者无敌": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","革命","战争","历史","电视剧"]},"我的抗战之猎豹突击": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","革命","战争","历史","电视剧"]},"魔都风云": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","动作","革命","战争","电视剧"]},"英雄戟之影子战士": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["55-70","45-55","36-45","24-36","激昂","热血","动作","革命","战争","历史","电视剧"]},"第一声枪响": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","革命","战争","历史","电视剧"]},"亮剑": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","45-55","36-45","24-36","激昂","热血","动作","革命","战争","历史","电视剧"]},"飞虎队": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","45-55","36-45","24-36","激昂","热血","动作","革命","战争","历史","电视剧"]},"伟大的转折": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","革命","战争","历史","电视剧"]},"太行英雄传": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","45-55","36-45","24-36","激昂","热血","动作","革命","战争","历史","电视剧"]},"雪豹": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","革命","军旅","战争","历史","电视剧"]},"宜昌保卫战": {"actor": ["侯勇"],"similarity": 11.0,"language": ["普通话"],"country": ["中国大陆"],"tag": ["上星剧","45-55","36-45","24-36","激昂","革命","战争","历史","电视剧"]}
}
排在前面的分别都是和我们的输入相关度很高的影视剧,相似度和相同的属性也赫然其中,看来效果还不错。
结语
这只是个demo,用来体验一下知识图谱在推荐系统中的应用。
最后,再次感谢原项目作者,没有他的辛勤劳作搭建出来的框架,我也很难做出第一步的实践。
再次给出原项目的地址:
一个简单的基于知识图谱的影视剧推荐系统
背景
这是去年九月份在研究知识图谱与推荐时做的一个Demo项目,源自于在github上找到一个关于汽车行业的知识图谱开源项目。我主要对它进行了一些改造,使之变成了一个基于知识图谱的影视剧推荐系统。
环境
python3、flask前端框架、图数据库neo4j(3.3.1)
操作系统为windows10
项目框架
把上面的汽车项目clone下来后,整个的项目结构如下图所示
里面有两个项目版本,第一次验收和第二次验收,两者主要区别是用的数据库不同,前者用的是mysql,后者用的是neo4j。我主要是基于第二次验收进行改造的。打开第二次验收的项目,里面的结构如下图所示
流程分析
下面,我们就原始项目的工作流程,进行一步一步的分析,因为只有这样,才能完成对其的改造。
数据的读取和插入
首先我们肯定需要把数据插入到neo4j里,那么上来我们就得启动neo4j,打开cmd,输入以下命令
neo4j console
然后如若cmd显示下面的消息,neo4j就启动完成了
最后一行显示的可用地址http://localhost:7474就是我们访问neo4j的地址,打开浏览器,把这个地址拷到地址栏里,敲下回车,就会看到neo4j的控制台界面 ,如下图所示
数据库启动完事之后,就可以打开项目里kg\kg.py文件了,在这里面,主要代码如下所示
def data_init(self):# 连接图数据库print('开始数据预处理')self.graph = Graph('http://localhost:7474', user="neo4j", password="szc")self.selector = NodeSelector(self.graph)self.graph.delete_all()def insert_datas(self):print('开始插入数据')with open('../data/tuples/three_tuples_2.txt', 'r', encoding='utf-8') as f:lines, num = f.readlines(), -1for line in lines:num += 1if num % 500 == 0:print('当前处理进度:{}/{}'.format(lines.index(line), len(lines)))line = line.strip().split(' ')if len(line) != 3:print('insert_datas错误:', line)continueself.insert_one_data(line)def insert_one_data(self, line):if '' in line:print('insert_one_data错误', line)returnstart = self.look_and_create(line[0])for name in self.get_items(line[2]):end = self.look_and_create(name)r = Relationship(start, line[1], end, name=line[1])self.graph.create(r) # 当存在时不会创建新的# 查找节点是否不存,不存在就创建一个def look_and_create(self, name):end = self.graph.find_one(label="car_industry", property_key="name", property_value=name)if end == None:end = Node('car_industry', name=name)return enddef get_items(self, line):if '{' not in line and '}' not in line:return [line]# 检查if '{' not in line or '}' not in line:print('get_items Error', line)lines = [w[1:-1] for w in re.findall('{.*?}', line)]return lines
最上面的data_init()函数,是用来连接neo4j数据库的,传入数据库地址、用户名、密码就可以了。然后调用graph.delete_all()函数,在插入数据前,先对原来的数据进行清空,这一步要根据自己的业务场景酌情考虑,是否保留。
然后是insert_datas()函数,这个函数就是读取txt文件,遍历每一行,对每一行调用insert_one_data()函数,进行每一行的解析,结点和关系的创建。根据代码可以发现,每一行的数据都是“起点 关系 终点”的形式,比如“安阳 位置 豫北”,就表示实体安阳和实体豫北的关系是位置,而且,顺序是安阳-->位置-->豫北。
调用insert_one_data()函数时,会先查询数据库里是否有这一个同名结点,根据结果决定是复用已有的还是建一个新的,这个过程对应函数look_and_create()。
在函数look_and_create()里,“car_industry”是数据库的标签(我理解是对应Mysql里每个数据库的名字,要用到哪个就调用命令use database some_database),然后find_one()函数里,property_name的值对应创建结点时Node的构造函数的参数名name,property_value就是Node的构造函数的name参数值,也就是实体的名字。拿我的故乡——安阳市实体为例,它在neo4j里的存储结构就可以理解为{property_name: "name", property_value: "安阳"}。
最后的get_items()函数就是实体的合法性检验,不做过多解读。
运行服务
数据全部插入数据库中后,就可以运行我们的服务了,文件对应run_server.py,里面代码如下
if __name__ == '__main__':args=get_args()print('\nhttp_host:{},http_port:{}'.format('localhost',args.http_port))app.run(debug=True, host='210.41.97.169', port=8090)
其实关键就是一句app.run()函数,把里面的Ip和端口换成自己就可以
处理页面请求
我们的业务逻辑是:在浏览器输入url和参数,获取相关结果。
其中,处理我们的参数的过程,对应文件views.py,里面的主要代码如下
@app.route('/KnowGraph/v2',methods=["POST"])
def look_up():kg=KnowGraph(get_args())client_params=request.get_json(force=True)server_param={}if client_params['method'] == 'entry_to_entry':kg.lookup_entry2entry(client_params,server_param)elif client_params['method'] == 'entry_to_property':kg.lookup_entry2property(client_params,server_param)elif client_params['method'] == 'entry':kg.lookup_entry(client_params,server_param)elif client_params['method'] == 'statistics':kg.lookup_statistics(client_params,server_param)elif client_params['method'] == 'live':params={'success':'true'}server_param['result']=params server_param['id']=client_params['id']server_param['jsonrpc']=client_params['jsonrpc']server_param['method']=client_params['method']print(server_param)return json.dumps(server_param, ensure_ascii=False).encode("utf-8")
可以看到,/KnowGraph/v2路径的post方法会路由到look_up函数里,里面根据参数method的值,调用kg对象的不同函数,执行不同的查询逻辑。
但是,我们在浏览器输入路径和参数然后敲下回车后,是要获取数据库信息,显然是对应的get方法。而且,关于向flask模板传递数据的路由也没写上,所以这个文件我们要进行大改。
数据查询
方才说到,views.py文件里会根据参数method的值的不同,调用kg对象的不同函数,来获取不同的结果。
而kg对象所属的KnowledgeGraph类,在文件modules.py里。以最简单也是最基本的对实体查询为例,我们看看其是怎么实现的,这对应lookup_entry函数,代码如下
def lookup_entry(self,client_params,server_param):#支持设定网络查找的深度start_time = time.time()params=client_params["params"]edges=set()self.lookup_entry_deep(edges,params,0)if len(edges)==0:server_param['result']={"success":'false'}else: server_param['result']={'edges':[list(i) for i in edges],"success":'true'}print('本次查找三元组的数量为:{},耗时:{}s'.format(len(edges),time.time()-start_time))
除了计时外,主要将客户端参数里的params取出来,里面包含要查找的实体名和查找深度,然后调用lookup_entry_deep函数进行查找,结果保存在edges集合里,最后将edges集合的每一项做为列表的列表的每一项,存储在server_params的'results'项中的'edges'里,进行返回。
下面,我们就看一下lookup_entry_deep函数的实现,代码如下
def lookup_entry_deep(self,edges,params,deep):#当前查找深度不得等于要求的深度if deep >= params['deep']:return#正向查找result1=self.graph.data("match (s)-[r]->(e) where s.name='{}' return s.name,r.name,e.name".format(params['name']))result2=self.graph.data("match (e)<-[r]-(s) where e.name='{}' return s.name,r.name,e.name".format(params['name']))if len(result1)==0 and len(result2)==0:returnfor item in result1:edges.add((item['s.name'],item['r.name'],item['e.name']))if item['s.name'] != item['e.name']:#避免出现:双面胶:中文名:双面胶的死循环params['name']=item['e.name']self.lookup_entry_deep(edges,params.copy(),deep+1)for item in result2:edges.add((item['s.name'],item['r.name'],item['e.name']))if item['s.name'] != item['e.name']:#避免出现:双面胶:中文名:双面胶的死循环params['name']=item['e.name']self.lookup_entry_deep(edges,params.copy(),deep+1)
首先,如果深度超标,就直接返回。然后先后针对params里的name项,也就是要查找的实体名,在数据库里进行正向和逆向的查询,然后把每一项做为元组保存在edges集合里,并递归调用这个函数,同时深度+1
改造
现有的流程就如上文所言,接下来,我们针对影视剧推荐的业务场景,对其进行改造。
假设有个用户观看了电视剧《上将XXX》,我们可以根据导演、演员、上映地、语种、类型标签等为其推荐他可能感兴趣的影视剧。
数据格式
我们的文件都保存在wiki目录里,均为txt文件,里面一行行的都是json,其中一行内容如下
{..... "title": "上将XXX", "wikiData": {....."wikiInfo": {"country": "中国大陆", "language": "普通话", "directors": ["安澜"], "actors": ["宋春丽", "王伍福", "张秋歌", "范明", "刘劲", "陶慧敏", "侯勇"], ....}, ...."wikiTags": ["电视剧", "历史", "战争", "军旅", "革命", "动作", "热血", "激昂", "24-36", "36-45", "45-55", "55-70", "上星剧", "传记"]}
}
里面有用的信息格式化后就像上面显示的,导演演员之类的。
接下来,我们就可以根据解析项目时理出的流程,进行改造
数据读取和插入
这对应kg.py文件,首先定义我们的目录路径
data_dir = "C:\\Users\\songzeceng\\Desktop\\wiki\\"
然后遍历这个目录下的文件,对每个文件进行读取和解析,代码如下
def insert_data_from_txt(self, file_path):try:with open(file=file_path, mode="r", encoding="utf-8") as f:for line in f.readlines():item = json.loads(line)if 'title' not in item.keys():continuetitle = self.look_and_create(item['title'])if 'wikiData' not in item.keys():continuewikiData = item['wikiData']if 'wikiDesc' in wikiData.keys():wikiDesc = self.look_and_create(wikiData['wikiDesc'])self.create_sub_graph(entity1=title, entity2=wikiDesc, relation="desc")if 'wikiTags' in wikiData.keys():for tag in wikiData['wikiTags']:tag = self.look_and_create(tag)self.create_sub_graph(entity1=title, entity2=tag, relation="tag")wikiInfo = wikiData['wikiInfo']if 'country' in wikiInfo.keys():country = self.look_and_create(wikiInfo['country'])self.create_sub_graph(entity1=title, entity2=country, relation="country")if 'language' in wikiInfo.keys():language = self.look_and_create(wikiInfo['language'])self.create_sub_graph(entity1=title, entity2=language, relation="language")if 'actors' in wikiInfo.keys():for actor in wikiInfo['actors']:actor = self.look_and_create(actor)self.create_sub_graph(entity1=title, entity2=actor, relation="actor")if 'directors' in wikiInfo.keys():for director in wikiInfo['directors']:actor = self.look_and_create(director)self.create_sub_graph(entity1=title, entity2=actor, relation="director")print(file_path, "读取完毕")except Exception as e:print("文件" + file_path + "读取异常:" + str(e))pass
看着长,其实就是解析每一项,先查找或创建实体,对应函数look_and_create。由于我的py2neo版本和原项目里的不一样,所以对这个函数进行了改写,代码如下
def look_and_create(self, name):matcher = NodeMatcher(self.graph)end = matcher.match("car_industry", name=name).first()if end == None:end = Node('car_industry', name=name)return end
然后进行实体关系的创建,对应函数create_sub_graph,代码如下
def create_sub_graph(self, entity1, relation, entity2):r = Relationship(entity1, relation, entity2, name=relation)self.graph.create(r)
整个kg文件代码如下所示
# coding:utf-8
'''
Created on 2018年1月26日@author: qiujiahao@email:997018209@qq'''
import sys
import re
import ossys.path.append('..')
from conf import get_args
from py2neo import Node, Relationship, Graph, NodeMatcher
import pandas as pd
import jsonimport osdata_dir = "C:\\Users\\songzeceng\\Desktop\\wiki\\"class data(object):def __init__(self):self.args = get_args()self.data_process()def data_process(self):# 初始化操 # 插入数据self.data_init()print("数据预处理完毕")def data_init(self):# 连接图数据库print('开始数据预处理')self.graph = Graph('http://localhost:7474', user="neo4j", password="szc")# self.graph.delete_all()file_names = os.listdir(data_dir)for file_name in file_names:self.insert_data_from_txt(data_dir + file_name)def insert_data_from_txt(self, file_path):try:with open(file=file_path, mode="r", encoding="utf-8") as f:for line in f.readlines():item = json.loads(line)if 'title' not in item.keys():continuetitle = self.look_and_create(item['title'])# id = self.look_and_create(item['id'])## self.create_sub_graph(entity1=title, entity2=id, relation="title")if 'wikiData' not in item.keys():continuewikiData = item['wikiData']if 'wikiDesc' in wikiData.keys():wikiDesc = self.look_and_create(wikiData['wikiDesc'])self.create_sub_graph(entity1=title, entity2=wikiDesc, relation="desc")if 'wikiTags' in wikiData.keys():for tag in wikiData['wikiTags']:tag = self.look_and_create(tag)self.create_sub_graph(entity1=title, entity2=tag, relation="tag")wikiInfo = wikiData['wikiInfo']if 'country' in wikiInfo.keys():country = self.look_and_create(wikiInfo['country'])self.create_sub_graph(entity1=title, entity2=country, relation="country")if 'language' in wikiInfo.keys():language = self.look_and_create(wikiInfo['language'])self.create_sub_graph(entity1=title, entity2=language, relation="language")if 'actors' in wikiInfo.keys():for actor in wikiInfo['actors']:actor = self.look_and_create(actor)self.create_sub_graph(entity1=title, entity2=actor, relation="actor")if 'directors' in wikiInfo.keys():for director in wikiInfo['directors']:actor = self.look_and_create(director)self.create_sub_graph(entity1=title, entity2=actor, relation="director")print(file_path, "读取完毕")except Exception as e:print("文件" + file_path + "读取异常:" + str(e))passdef create_sub_graph(self, entity1, relation, entity2):r = Relationship(entity1, relation, entity2, name=relation)self.graph.create(r)def look_and_create(self, name):matcher = NodeMatcher(self.graph)end = matcher.match("car_industry", name=name).first()if end == None:end = Node('car_industry', name=name)return endif __name__ == '__main__':data = data()
运行之,命令行输出如下图所示
数据不规范,很多文件读不了,不管了,反正就是个demo。然后neo4j数据库里,取25条数据,结果如下图所示
运行服务
这里直接把run_server.py里的ip和端口改成自己的就行了
处理请求
这一步对应views.py。
首先我们要把/KnowGraph/v2路径的get请求拦截,所以要加一个注解函数,如下所示
@app.route('/KnowGraph/v2', methods=["GET"])
def getInfoFromServer():pass
然后就实现这个函数即可,首先处理请求参数,我们的请求完整url是这样的
http://localhost:8090/KnowGraph/v2?method=entry&jsonrpc=2.0&id=1¶ms=entry=上将许世友-deep=2
参数比较多,而且很多是固定的,比如jsonrpc、id等,因此我将其简化为
http://localhost:8090/KnowGraph/v2?name=上将许世友
然后在getInfoFromServer()函数里,把默认参数都加上即可,代码如下
def handle_args(originArgs):if 'name' not in originArgs.keys():return Noneargs = {}for item in originArgs:key = itemvalue = originArgs[key]if key == "params":kvs = str(value).split("-")kv_dic = {}for item in kvs:kv = item.split("=")k = kv[0]v = kv[1]if v.isnumeric():kv_dic[k] = int(v)else:kv_dic[k] = vargs[key] = kv_dicelse:if value.isnumeric():args[key] = int(value)else:args[key] = valueif 'params' not in args.keys():args['params'] = {'name': args['name']}args.pop('name')args['params']['name'] = args['params']['name'].replace('\'', '\\\'')if 'method' not in args.keys():args['method'] = 'entry'if 'deep' not in args['params'].keys():args['params']['deep'] = 2if 'jsonrpc' not in args.keys():args['jsonrpc'] = 2.0if 'id' not in args.keys():args['id'] = 1return args
其实主要就是遍历和填充操作
参数处理完后,我们就可以根据参数里的method字段,来进行不同的查询操作了,然后从server_param的result字段里获取结果,交给前端,进行页面的渲染。故而,可以写出getInfoFromServer()函数代码如下
@app.route('/KnowGraph/v2', methods=["GET"])
def getInfoFromServer():args = handle_args(request.args.to_dict())kg = KnowGraph(args)client_params = argsserver_param = {}if client_params['method'] == 'entry':kg.lookup_entry(client_params, server_param)server_param['id'] = client_params['id']server_param['jsonrpc'] = client_params['jsonrpc']server_param['method'] = client_params['method']print("server_param:\n", server_param)global mydataif 'result' in server_param.keys():mydata = server_param['result']else:mydata = '{}'print("mydata:\n", mydata)return render_template("index.html")
这里我们只处理对实体的查询,因为我们的输入就是用户观看的一个影视剧的名字。
渲染界面时,会通过/KnowGraph/data路径获取数据,因此要将其拦截,代码如下
@app.route("/KnowGraph/data")
def data():print("data:", data)return mydata
整个的views.py文件如下所示
# coding:utf-8
'''
Created on 2018年1月9日@author: qiujiahao@email:997018209@qq'''from flask import jsonify
from conf import *
from flask import Flask
from flask import request, render_template
from server.app import app
import tensorflow as tf
from server.module import KnowGraph
import jsonmydata = ""# http://210.41.97.89:8090/KnowGraph/v2?name=胜利之路
# http://113.54.234.209:8090/KnowGraph/v2?name=孤战
# http://localhost:8090/KnowGraph/v2?method=entry_to_property&jsonrpc=2.0&id=1¶ms=entry=水冶-property=位置
@app.route('/KnowGraph/v2', methods=["GET"])
def getInfoFromServer():args = handle_args(request.args.to_dict())kg = KnowGraph(args)client_params = argsserver_param = {}if client_params['method'] == 'entry':kg.lookup_entry(client_params, server_param)server_param['id'] = client_params['id']server_param['jsonrpc'] = client_params['jsonrpc']server_param['method'] = client_params['method']print("server_param:\n", server_param)global mydataif 'result' in server_param.keys():mydata = server_param['result']else:mydata = '{}'print("mydata:\n", mydata)return render_template("index.html")def handle_args(originArgs):if 'name' not in originArgs.keys():return Noneargs = {}for item in originArgs:key = itemvalue = originArgs[key]if key == "params":kvs = str(value).split("-")kv_dic = {}for item in kvs:kv = item.split("=")k = kv[0]v = kv[1]if v.isnumeric():kv_dic[k] = int(v)else:kv_dic[k] = vargs[key] = kv_dicelse:if value.isnumeric():args[key] = int(value)else:args[key] = valueif 'params' not in args.keys():args['params'] = {'name': args['name']}args.pop('name')args['params']['name'] = args['params']['name'].replace('\'', '\\\'')if 'method' not in args.keys():args['method'] = 'entry'if 'deep' not in args['params'].keys():args['params']['deep'] = 2if 'jsonrpc' not in args.keys():args['jsonrpc'] = 2.0if 'id' not in args.keys():args['id'] = 1return args@app.route("/KnowGraph/data")
def data():print("data:", data)return mydata
数据库查询
最后,我们把精力投放在module.py中的数据库查询和结果分析中。
为了便于查看,我们把结果放在json文件里,因此,查询结果在内存中用字典存储,每一次查询前,先把字典清空,再进行查询,然后根据有无结果,执行不同的解析逻辑。因此,可以写出lookup_entry函数如下所示
def lookup_entry(self, client_params, server_param):# 支持设定网络查找的深度start_time = time.time()params = client_params["params"]edges = set()sim_dict.clear()self.lookup_entry_deep(edges, params, 0)if len(edges) == 0:server_param['success'] = 'false'else:self.handleResult(edges, server_param, start_time)
对实体的查询都放在lookup_entry_deep()函数里。一般来说,我们的深度只有两层, 第一层是我们查询用户影视剧的各个属性,比如上将许世友的导演,第二层我们根据每个属性,去查找这个属性对应的实体,比如查询上将许世友的导演,还主拍过哪些影视剧。显然,第一层为正向查找,第二层则为逆向查找。
在查找时,为了避免向用户推荐他刚看过的影视剧,我们还要对结果进行去重。比方说,我们针对上将XXX进行查找,当查到上将XXX的导演为安澜,然后对安澜进行逆向查找时,如果发现安澜只导演过上将XXX这一部作品,那我们就没必要也不应该,把上将许世友加入到推荐列表里。
针对上面的没有查出别的实体的情况,我把这一返回结果定义为'nothing else';如果什么也没查到,就是'nothing got';如果深度超标,就是'deep out';一切正常,则为'ok'。
我们先进行双向查询,代码如下
result1 = self.graph.run(cypher='''match (s)-[r]->(e) where s.name='{}'return s.name,r.name,e.name'''.format(params['name'])).data()result2 = self.graph.run(cypher='''match (e)<-[r]-(s) where e.name='{}' return s.name,r.name,e.name '''.format(params['name'])).data()
然后对两个结果进行判空,如果长度都为0,就返回'nothing got'
if len(result1) == 0 and len(result2) == 0:return 'nothing got'
如果result2(也就是逆向查找的结果)只有一项,这一项中的s.name(也就是影视剧名)还是输入的实体名,e.name(也就是属性名)还是原来的属性名,那就直接返回'nothing else'
if len(result2) == 1:item = result2[0]if origin_tv_name is not None and origin_property_name is not None:if origin_property_name == item['e.name'] and origin_tv_name == item['s.name']:return 'nothing else'
这里的origin_tv_name和origin_property_name都是lookup_entry_deep函数的参数之一,默认为None
然后我们先遍历正向查询结果result1,把里面的属性值(e.name)、属性名(r.name)和原始影视剧(s.name)连接起来,作为三元组保存到edges集合里。
for item in result1:tv_name = item['s.name']property_name = item['e.name']has_result = Falseif tv_name != property_name: # 避免出现:双面胶:中文名:双面胶的死循环if oldName != property_name:params['name'] = property_namehas_result = self.lookup_entry_deep(edges, params.copy(), deep + 1,origin_tv_name=tv_name,origin_property_name=property_name)
oldName是本次查询的实体名,此处为了避免出现死循环,加了个判断,其实我们这个场景里,这个判断肯定是成立的。
接下来,我们就分析逆向查找的结果。如果查出了新的影视剧,就先根据新影视剧和属性的关系,得出这一关系的相似度。然后,再把新的影视剧、相同属性名、相似度以或累加、或新建的方式加入相似字典和edges集合里,代码如下
for item in result2:tv_name = item['s.name']property_name = item['e.name']relation_name = item['r.name']if tv_name != origin_tv_name:score = get_sim_score_accroding_to_relation(relation_name)if tv_name not in sim_dict.keys():sim_dict[tv_name] = {relation_name: [property_name],"similarity": score}else:item_dict = sim_dict[tv_name]if relation_name in item_dict.keys() and \property_name in item_dict.values():continueif relation_name in item_dict.keys():item_dict[relation_name].append(property_name)else:item_dict[relation_name] = [property_name]item_dict["similarity"] += scoreedges.add((tv_name, relation_name, property_name))
其中,根据关系获得相似度的函数get_sim_score_accroding_to_relation()的代码如下所示
def get_sim_score_accroding_to_relation(relation_name):if relation_name in ['actor', 'director', 'tag']:return 1.0elif relation_name in ['language', 'country']:return 0.5return 0.0
完整的lookup_entry_deep()函数如下所示
# 限制深度的查找def lookup_entry_deep(self, edges, params, deep, origin_tv_name=None, origin_property_name=None):# 当前查找深度不得等于要求的深度if deep >= params['deep']:return 'deep out'# 正向查找oldName = str(params['name'])if oldName.__contains__("\'") and not oldName.__contains__("\\\'"):params['name'] = oldName.replace("\'", "\\\'")result1 = self.graph.run(cypher='''match (s)-[r]->(e) where s.name='{}'return s.name,r.name,e.name'''.format(params['name'])).data()result2 = self.graph.run(cypher='''match (e)<-[r]-(s) where e.name='{}' return s.name,r.name,e.name '''.format(params['name'])).data()if len(result1) == 0 and len(result2) == 0:return 'nothing got'if len(result2) == 1:item = result2[0]if origin_tv_name is not None and origin_property_name is not None:if origin_property_name == item['e.name'] and origin_tv_name == item['s.name']:return 'nothing else'for item in result1:tv_name = item['s.name']property_name = item['e.name']if tv_name != property_name: # 避免出现:双面胶:中文名:双面胶的死循环if oldName != property_name:params['name'] = property_namehas_result = self.lookup_entry_deep(edges, params.copy(), deep + 1,origin_tv_name=tv_name,origin_property_name=property_name)for item in result2:has_result = Falsetv_name = item['s.name']property_name = item['e.name']relation_name = item['r.name']if tv_name != origin_tv_name:score = get_sim_score_accroding_to_relation(relation_name)if tv_name not in sim_dict.keys():sim_dict[tv_name] = {relation_name: [property_name],"similarity": score}else:item_dict = sim_dict[tv_name]if relation_name in item_dict.keys() and \property_name in item_dict.values():continueif relation_name in item_dict.keys():item_dict[relation_name].append(property_name)else:item_dict[relation_name] = [property_name]item_dict["similarity"] += scoreedges.add((tv_name, relation_name, property_name))return 'ok'
当查询完成后,如果有结果,我们就会到handle_result()函数里处理结果,进行返回或输出。主要是根据相似度进行从高到低的排序,然后取出前20个,写入json文件,这部分代码如下所示
def handleResult(self, edges, server_param, start_time):....sorted_sim_list = sorted(sim_dict.items(), key=lambda x: x[1]['similarity'], reverse=True)ret = {}for i in range(len(sorted_sim_list)):if i >= 20:breakret[sorted_sim_list[i][0]] = sorted_sim_list[i][1]mydata = json.dumps(ret, ensure_ascii=False)print('Json路径是:%s' % (fname))self.clear_and_write_file(fname, mydata)def clear_and_write_file(self, fname, mydata):with open(fname, 'w', encoding='utf-8') as f:f.write(str(""))with open(fname, 'a', encoding='utf-8') as f:f.write(str(mydata))
除此之外,我还将结果存放在了server_param里,用于向前端界面输出结果,这部分代码如下所示
ret = []for result in edges:ret.append({"source": result[0],"target": result[2],"relation": result[1],"label": "relation"})print("ret:", ret)server_param['result'] = {"edges": ret}server_param['success'] = 'true'print('本次查找三元组的数量为:{},耗时:{}s'.format(len(ret), time.time() - start_time))
完整的结果处理函数的代码如下
def handleResult(self, edges, server_param, start_time):ret = []for result in edges:ret.append({"source": result[0],"target": result[2],"relation": result[1],"label": "relation"})print("ret:", ret)server_param['result'] = {"edges": ret}server_param['success'] = 'true'print('本次查找三元组的数量为:{},耗时:{}s'.format(len(ret), time.time() - start_time))sorted_sim_list = sorted(sim_dict.items(), key=lambda x: x[1]['similarity'], reverse=True)ret = {}for i in range(len(sorted_sim_list)):if i >= 20:breakret[sorted_sim_list[i][0]] = sorted_sim_list[i][1]mydata = json.dumps(ret, ensure_ascii=False)print('Json路径是:%s' % (fname))self.clear_and_write_file(fname, mydata)
运行结果
首先启动服务,运行run_server.py,然后在浏览器地址栏里,输入如下url(XXX为输入的名字):
http://210.41.97.169:8090/KnowGraph/v2?name=XXX
然后页面输出如下
结果非常庞杂,我们再看看json文件里的前20个的输出,结果如下
{"XXX元帅": {"actor": ["侯勇","刘劲"],"similarity": 14.0,"language": ["普通话"],"country": ["中国大陆"],"tag": ["传记","上星剧","55-70","45-55","36-45","24-36","热血","革命","战争","历史","电视剧"]},"BBB": {"actor": ["刘劲","王伍福"],"similarity": 14.0,"language": ["普通话"],"country": ["中国大陆"],"tag": ["传记","上星剧","55-70","45-55","36-45","24-36","热血","革命","战争","历史","电视剧"]},"长征大会师": {"actor": ["刘劲","王伍福"],"similarity": 14.0,"language": ["普通话"],"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","革命","战争","历史","电视剧"]},"战将": {"language": ["普通话"],"similarity": 13.0,"country": ["中国大陆"],"tag": ["传记","上星剧","55-70","45-55","36-45","24-36","热血","动作","革命","战争","历史","电视剧"]},"炮神": {"language": ["普通话"],"similarity": 13.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","动作","革命","军旅","战争","历史","电视剧"]},"独立纵队": {"language": ["普通话"],"similarity": 13.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","动作","革命","战争","历史","电视剧"]},"女子军魂": {"language": ["普通话"],"similarity": 13.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","革命","军旅","战争","历史","电视剧"]},"热血军旗": {"actor": ["侯勇"],"similarity": 12.0,"language": ["普通话"],"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","热血","动作","革命","战争","历史","电视剧"]},"擒狼": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","动作","革命","战争","历史","电视剧"]},"信者无敌": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","革命","战争","历史","电视剧"]},"我的抗战之猎豹突击": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","革命","战争","历史","电视剧"]},"魔都风云": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","动作","革命","战争","电视剧"]},"英雄戟之影子战士": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["55-70","45-55","36-45","24-36","激昂","热血","动作","革命","战争","历史","电视剧"]},"第一声枪响": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","革命","战争","历史","电视剧"]},"亮剑": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","45-55","36-45","24-36","激昂","热血","动作","革命","战争","历史","电视剧"]},"飞虎队": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","45-55","36-45","24-36","激昂","热血","动作","革命","战争","历史","电视剧"]},"伟大的转折": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","热血","革命","战争","历史","电视剧"]},"太行英雄传": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","45-55","36-45","24-36","激昂","热血","动作","革命","战争","历史","电视剧"]},"雪豹": {"language": ["普通话"],"similarity": 12.0,"country": ["中国大陆"],"tag": ["上星剧","55-70","45-55","36-45","24-36","激昂","革命","军旅","战争","历史","电视剧"]},"宜昌保卫战": {"actor": ["侯勇"],"similarity": 11.0,"language": ["普通话"],"country": ["中国大陆"],"tag": ["上星剧","45-55","36-45","24-36","激昂","革命","战争","历史","电视剧"]}
}
排在前面的分别都是和我们的输入相关度很高的影视剧,相似度和相同的属性也赫然其中,看来效果还不错。
结语
这只是个demo,用来体验一下知识图谱在推荐系统中的应用。
最后,再次感谢原项目作者,没有他的辛勤劳作搭建出来的框架,我也很难做出第一步的实践。
再次给出原项目的地址:
本文标签: 一个简单的基于知识图谱的影视剧推荐系统
版权声明:本文标题:一个简单的基于知识图谱的影视剧推荐系统 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/IT/1694658724a254637.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论