#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# coding:utf-8
#
# Dataset class: create, modify, export and import Datasets from Wikidata
# Copyright (C) 2016 Víctor Fernández Rico <vfrico@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import requests
import json
import pickle
import numpy as np
import threading
from datetime import datetime
import time
import copy
import logging
from collections import defaultdict
# Disable logging for requests library
logging.getLogger("requests").setLevel(logging.WARNING)
[docs]class Dataset():
"""
Class used to create, modify, export and import Datasets from Wikidata
"""
SPARQL_ENDPOINT = \
"""https://query.wikidata.org/bigdata/namespace/wdq/sparql?query="""
entities = []
entities_dict = {}
relations = []
relations_dict = {}
subs = []
# Used to show current status
status = {'started': 0,
'round_curr': 0,
'round_total': 0,
'it_analyzed': 0,
'it_total': 0,
'active': False}
[docs] def __init__(self, sparql_endpoint=None, thread_limiter=4):
"""Creates the dataset class
The default endpoint is the original from wikidata.
:param string sparql_endpoint: The URI of the SPARQL endpoint
:param integer thread_limiter: The number of concurrent HTTP queries
"""
if sparql_endpoint is not None:
self.SPARQL_ENDPOINT = sparql_endpoint
self.th_semaphore = threading.Semaphore(thread_limiter)
# self.query_sem = threading.Semaphore(thread_limiter)
# Instanciate splited subs as false
self.splited_subs = {'updated': False}
[docs] def show(self, verbose=False):
"""Show all elements of the dataset
By default prints only one line with the number of entities, relations
and triplets. If verbose, prints every list. Use wisely
:param bool verbose: If true prints every item of all lists
"""
print("%d entities, %d relations, %d tripletas" %
(len(self.entities), len(self.relations), len(self.subs)))
if verbose is True:
print("\nEntities (%d):" % len(self.entities))
for entity in self.entities:
print(entity)
print("\nRelations (%d):" % len(self.relations))
for relation in self.relations:
print(relation)
print("\nTripletas (%d):" % len(self.subs))
for sub in self.subs:
print(sub)
[docs] def add_element(self, element, complete_list,
complete_list_dict):
"""Add element to a list of the dataset. Avoids duplicate elements.
:param string element: The element that will be added to list
:param list complete_list: The list in which will be added
:param dict complete_list_dict: The dict which represents the list.
:param bool only_uri: Allow load objects distincts than URI's
:return: The id on the list of the added element
:rtype: integer
"""
if element in complete_list_dict:
# Item is on the list, return same id
return complete_list_dict[element]
else:
# Item is not on the list, append and return id
complete_list.append(element)
id_item = len(complete_list)-1
complete_list_dict[element] = id_item
return id_item
def add_entity(self, entity):
return self.add_element(entity, self.entities,
self.entities_dict)
def add_relation(self, relation):
return self.add_element(relation, self.relations,
self.relations_dict)
[docs] def exist_element(self, element, complete_list_dict):
"""Check if element exists on a given list
:param string element: The element itself
:param dict complete_list_dict: The dictionary to search in
:return: Wether the item was found or no
:rtype: bool
"""
if element in complete_list_dict:
return True
else:
return False
[docs] def check_entity(self, entity):
"""Check the entity given and return a valid representation
The parent class assumes all entities are valid
:param string entity: The input entity representation
:return: A valid representation or None
:rtype: string
"""
return entity
[docs] def check_relation(self, relation):
"""Check the relation given and return a valid representation
The parent class assumes all relations are valid
:param string relation: The input relation representation
:return: A valid representation or None
:rtype: string
"""
return relation
[docs] def get_entity_id(self, entity):
"""Gets the id given an entity
:param string entity: The entity string
"""
try:
return self.entities_dict[entity]
except (KeyError, ValueError):
return -1
[docs] def get_entity(self, id):
"""Gets the entity given an id
:param integer id: The id to find
"""
try:
return self.entities[id]
except ValueError:
return None
[docs] def get_relation(self, id):
"""Gets the relation given an id
:param int id: The relation identifier to find
"""
try:
return self.relations[id]
except ValueError:
return None
[docs] def get_relation_id(self, relation):
"""Gets the id given an relation
:param string entity: The relation string
"""
try:
return self.relations_dict[relation]
except (KeyError, ValueError):
return -1
[docs] def add_triple(self, subject, obj, pred):
"""Add the triple (subject, object, pred) to dataset
This method will add the three elements and append the tuple of the
relation into the dataset
:param string subject: Subject of the triple
:param string obj: Object of the triple
:param string pred: Predicate of the triple
:return: If the operation was correct
:rtype: boolean
"""
subject = self.check_entity(subject)
pred = self.check_relation(pred)
obj = self.check_entity(obj)
if pred and obj and subject:
# Add relation
id_subj = self.add_entity(subject)
id_pred = self.add_relation(pred)
id_obj = self.add_entity(obj)
if id_subj is not False or id_pred is not False:
self.subs.append((id_subj, id_obj, id_pred))
self.splited_subs['updated'] = False
return True
return False
[docs] def load_dataset_from_csv(self, file_readable, separator_char=","):
"""Given a csv file, loads into the dataset
This method will not open or close any file, and should be provided
only a iterable object which each iteration should provide only
one line. Also, if the csv does not use commas, you also should
provide the separator char. The code will try to split the line
with that separator char.
It also will use only the first three columns, being the order:
(object, predicate, subject)
:param Iterable file_readable: An iterator object
:param string separator_char: the separator string used in each line
:returns: If the process ends correctly
:rtype: boolean
"""
rt_check = True
for line in file_readable:
triple = line.rstrip().split(separator_char)
rt_check = rt_check and self.add_triple(triple[0],
triple[2], triple[1])
return rt_check
[docs] def load_dataset_from_json(self, json):
"""Loads the dataset object with a JSON
The JSON structure required is:
{'object': {}, 'subject': {}, 'predicate': {}}
:param list json: A list of dictionary parsed from JSON
:return: If operation was successful
:rtype: bool
"""
result = True
for triple in json:
added = self.add_triple(triple["subject"]['value'],
triple["object"]['value'],
triple["predicate"]['value'])
result = result and added
return result
[docs] def load_dataset_from_query(self, query):
"""Receives a Sparql query and fills dataset object with the response
The method will execute the query itself and will call to other method
to fill in the dataset object
:param string query: A valid SPARQL query
:param bool only_uri: Allow load objects distincts than URI's
"""
result_query = self.execute_query(query)
if result_query[0] is not 200:
raise Exception("Error on endpoint. HTTP status code: " +
str(result_query[0]))
else:
jsonlist = result_query[1]
# print(json.dumps(jsonlist, indent=4, sort_keys=True))
self.load_dataset_from_json(jsonlist)
[docs] def load_dataset_from_nlevels(self, nlevels, extra_params=""):
"""Builds a nlevels query, executes, and loads data on object
:deprecated:
:param integer nlevels: Deep of the search on wikidata graph
:param string extra_params: Extra SPARQL instructions for the query
:param bool only_uri: Allow load objects distincts than URI's
"""
query = self.build_n_levels_query(nlevels)+" "+extra_params
print(query)
return self.load_dataset_from_query(query, only_uri=only_uri)
[docs] def build_levels(self, n_levels):
"""Generates a simple *chain* of triplets for the desired levels
:deprecated:
:param integer n_levels: Deep of the search on wikidata graph
:return: A list of chained triplets
:rtype: list
"""
ob1 = "wikidata"
pre = "predicate"
ob2 = "object"
pre_base = pre
obj_base = ob2
predicateCount = 1
objectCount = 1
tripletas = []
for level in range(1, n_levels+1):
tripletas.append((ob1, pre, ob2))
objectCount += 1
predicateCount += 1
ob1 = ob2
ob2 = obj_base + str(objectCount)
pre = pre_base + str(predicateCount)
return tripletas
[docs] def build_n_levels_query(self, n_levels=3):
"""Builds a CONSTRUCT SPARQL query of the desired deep
:deprecated:
:param integer n_levels: Deep of the search on wikidata graph
:return: The desired chained query
:rtype: string
"""
lines = []
for level in self.build_levels(n_levels):
lines.append("?"+level[0]+" ?"+level[1]+" ?"+level[2])
query = """PREFIX wikibase: <http://wikiba.se/ontology>
construct {{ {0} }}
WHERE {{ ?wikidata wdt:P950 ?bne .
{1}
}} """.format(" . ".join(lines), " . \n".join(lines))
return query
[docs] def load_entire_dataset(self, levels,
where="", batch=100000, verbose=True):
"""Loads the dataset by quering to Wikidata on the desired levels
:deprecated:
:param integer levels: Deep of the search
:param string where: Extra where statements for SPARQL query
:param integer batch: Number of elements returned each query
:param bool verbose: True for showing all steps the method do
:return: True if operation was successful
:rtype: bool
"""
# Generate select query to get entities count
lines = []
for level in self.build_levels(levels):
lines.append("?"+level[0]+" ?"+level[1]+" ?"+level[2])
count_query = """PREFIX wikibase: <http://wikiba.se/ontology>
SELECT (count(distinct ?object) as ?count)
WHERE {{ ?wikidata wdt:P950 ?bne .
{1}
{0}
}} """.format(" . \n".join(lines), where)
if verbose:
print("Query is: "+count_query)
code, count_json = self.execute_query(count_query)
if verbose:
print(code, count_json)
tuples_number = int(count_json[0]['count']['value'])
# Generate several LIMIT & OFFSET queries
batch = 100000
base_query = self.build_n_levels_query(n_levels=levels)
# Number of queries to do: add one more to last iteration
n_queries = int(tuples_number / batch) + 1
json_total = []
for query in range(0, n_queries):
if verbose:
print("\n\nEmpieza ronda {} de {}".format(query, n_queries))
limit_string = " LIMIT {} OFFSET {}".format(batch, 0*batch)
# print(str(query)+"\n\n"+base_query + limit_string)
sts, resp = self.execute_query(base_query + limit_string)
if sts is not 200:
print(resp)
if verbose:
print(sts, len(resp))
print("Guardando en el dataset...")
self.load_dataset_from_json(resp)
if verbose:
print("Guardado!")
self.show()
[docs] def _process_entity(self, entity, verbose=None):
"""Add all relations and entities related with the entity to dataset
Additionally, this method should return a list of the entities it is
connected to scan those entities on next level exploration.
This method is not implemented by parent class. **MUST** be
implemented through a child object
:param string method: The URI of the element to be processed
:param int verbose: The level of verbosity. 0 is low, and 2 is high
:return: Entities to be scanned in next level
:rtype: List
"""
raise NotImplementedError("The method _process_entity should be "
"implemented through a child object")
[docs] def process_entity(self, entity, append_queue=lambda x: None, max_tries=10,
callback=lambda x: None, verbose=0, _times=0, **kwargs):
"""Wrapper for child method `dataset._process_entity`_
Will call self method `dataset._process_entity`_ and examine the return
value: should return a list of elements to be queried again or None.
This method will run in a single thread
:param string element: The URI of element to be scanned
:param function append_queue: A function that receives the subject of
a triplet as an argument
:param integer verbose: The level of verbosity. 0 is low, and 2 is high
:param function callback: The callback function. Default is return
:param int max_tries: If an exception is raised, max number of attempts
:param int _times: Reserved for recursive calls. Don't use
:return: If operation was successful
:rtype: boolean
"""
try:
# Get elements to add on the queue.
el_queue = self._process_entity(entity, verbose=verbose, **kwargs)
# print(el_queue)
if not el_queue:
return callback(False)
else:
for element in el_queue:
append_queue(element)
return callback(True)
except Exception as exc:
# If an exception such ConnectionError or similar appears,
# try again. (but only for 10 times)
times_new = _times + 1
if times_new < max_tries:
print("[{0}]Error found: '{1}' "
"Trying again".format(times_new, exc))
return self.process_entity(entity, append_queue=append_queue,
callback=callback, verbose=verbose,
_times=times_new)
else:
print("[{0}]Error found: '{1}'' Has been tried {0}/{2} times. "
"Exiting".format(times_new, exc, max_tries))
return False
[docs] def load_from_graph_pattern(self):
"""Get the root entities where the graph build should start
This should return a list with elements to start seeking its childs
and start building a dataset graph from this root elements.
This method will return *all* the entities on the dataset.
**MUST** be implemented through a child object
:return: An entities list
:rtype: list
"""
raise NotImplementedError("The method `load_from_graph_pattern` should"
" be implemented through a child object")
[docs] def load_dataset_recurrently(self, levels, seed_vector, verbose=1,
limit_ent=None, ext_callback=lambda x: None,
**keyword_args):
"""Loads to dataset all entities with BNE ID and their relations
Due to Wikidata endpoint cann't execute queries that take long time
to complete, it is necessary to consruct the dataset entity by entity,
without using SPARQL CONSTRUCT. This method will start concurrently
some threads to make several SPARQL SELECT queries.
:param list seed_vector: A vector of entities to start with
:param integer levels: The depth to get triplets
:param integer verbose: The level of verbosity. 0 is low, and 2 is high
:return: True if operation was successful
:rtype: bool
"""
new_queue = seed_vector
el_queue = []
self.status['started'] = datetime.now()
self.status['it_analyzed'] = 0
self.status['active'] = True
self.status['round_total'] = levels
if verbose > 1:
# TODO: Status thread must can be killed from outside
# Initialize status variables
status_thread = threading.Thread(
target=self.control_thread,
args=(),)
status_thread.start()
# Loop for depth levels
for level in range(0, levels):
# Loop for every item on the queue
# Interchange lists
el_queue = copy.copy(new_queue)
new_queue = []
# Apply limitation
if limit_ent is not None:
el_queue = el_queue[:limit_ent*((level+1)**3)]
if verbose > 0:
print("Scanning level {}/{} with {} elements"
.format(level+1, levels, len(el_queue)))
# Initialize some status variables
self.status['round_curr'] = level
self.status['it_total'] = len(el_queue)
self.status['it_analyzed'] = 0
# pool for threads
threads = []
# Scan every entity on queue
for element in el_queue:
# Generate n threads, start them and save into pool
def func_callback(status):
self.status['it_analyzed'] += 1
self.th_semaphore.release()
ext_callback(self.status)
self.th_semaphore.acquire()
call_kwargs = copy.copy(keyword_args)
call_kwargs['verbose'] = verbose
call_kwargs['append_queue'] = lambda e: new_queue.append(e)
call_kwargs['callback'] = func_callback
t = threading.Thread(
target=self.process_entity,
args=(element, ),
kwargs=call_kwargs)
threads.append(t)
t.start()
if verbose > 0:
print("Waiting all threads to end")
# Wait for threads to end
for th in threads:
th.join()
if verbose > 1:
# To help kill the status thread may
# be useful to imput a 'q' character on stdin
self.status['active'] = False
return True
[docs] def control_thread(self):
"""
Starts a loop waiting for user to request information about progress
This method should not be called from other method
distinct than 'load_dataset_recurrently'
TODO: Should end when parent thread ends...
"""
self.status['active'] = True
while self.status['active']:
ui = input("Enter S to show status: ")
if ui is "s" or ui is "S":
print(self.get_status())
elif ui is "q" or ui is "Q":
self.status['active'] = False
[docs] def get_status(self):
"""
Returns a formated string with current progress
This is a helper method and should not be called from
other method distinct than dataset.load_dataset_recurrently_
:return: Current download progress
:rtype: string
"""
elapsed = datetime.now() - self.status['started']
try:
scanned = 100 * (self.status['it_analyzed'] /
self.status['it_total'])
except ZeroDivisionError:
scanned = 0
status_str = ("Elapsed time: {0}s. Depth {1} of {2}."
" Entities scanned: {3:.2f}% ({4} of {5})"
" Active threads: {6}").format(
elapsed.seconds,
self.status['round_curr']+1,
self.status['round_total'],
scanned,
self.status['it_analyzed'],
self.status['it_total'],
threading.active_count())
return status_str
[docs] def save_to_binary(self, filepath, improved_split=False):
"""Saves the dataset object on the disk
The dataset will be saved with the required format for reading
from the original library, and is prepared to be trained.
:param string filepath: The path of the file where should be saved
:return: True if operation was successful
:rtype: bool
"""
print(self)
self.show()
if improved_split:
subs2 = self.improved_split()
else:
subs2 = self.train_split()
all_dataset = {
'entities': self.entities,
'relations': self.relations,
'train_subs': subs2['train_subs'],
'valid_subs': subs2['valid_subs'],
'test_subs': subs2['test_subs'],
'__class__': self.__class__
}
try:
f = open(filepath, "w+b")
except FileNotFoundError:
msg = "The path {0} is not valid or is not writable".format(
filepath)
raise FileNotFoundError(msg)
except Exception as err:
print("Error found:")
print(err)
return False
pickle.dump(all_dataset, f)
f.close()
return True
[docs] def load_from_binary(self, filepath, **kwargs):
"""Loads the dataset object from the disk
Loads this dataset object with the binary file
:param string filepath: The path of the binary file
:return: True if operation was successful
:rtype: bool
"""
try:
f = open(filepath, "rb")
except FileNotFoundError:
msg = "The path {0} is not valid".format(filepath)
raise FileNotFoundError(msg)
all_dataset = pickle.load(f)
f.close()
try:
self.__class__ = all_dataset['__class__']
self.__init__(**kwargs)
except KeyError:
# This is an old generated dataset
pass
self.entities = all_dataset['entities']
self.relations = all_dataset['relations']
self.subs = all_dataset['train_subs'] + all_dataset['valid_subs'] +\
all_dataset['test_subs']
# Fill dicts
self._load_elements_into_dict(self.entities_dict, self.entities)
self._load_elements_into_dict(self.relations_dict, self.relations)
self.splited_subs = {
'updated': True,
'train_subs': all_dataset['train_subs'],
'valid_subs': all_dataset['valid_subs'],
'test_subs': all_dataset['test_subs']
}
# self.subs = all_dataset['subs']
return True
[docs] def _load_elements_into_dict(self, el_dict, el_list):
"""Insert elements from a list into dict
:param dict el_dict: A dict to be inserted elements
:param list el_list: A list where elements are
"""
for i in range(0, len(el_list)):
el_dict[el_list[i]] = i
[docs] def improved_split(self, ratio=0.8):
"""Split made with sklearn library, with different split for each label
This split function makes different splits for each label which is
present on the dataset. This helps to distribute better all the
splits.
:param float ratio: The ratio of all triplets required for *train_subs*
:return: A dictionary with splited subs
:rtype: dict
"""
triples = np.matrix(self.subs)
trip_ddict = defaultdict(list)
for triple in triples:
# Label is the id of the relation
label = triple[0, 2]
# Append to the list of triples
trip_ddict[label].append(triple.tolist()[0])
# Once all triples has been classified on trip_ddict, split by label
import math
from sklearn.model_selection import StratifiedShuffleSplit
train_triples = []
valid_triples = []
test_triples = []
for label in trip_ddict:
# Triples with current label
triples = np.array(subs[label])
sss = StratifiedShuffleSplit(n_splits=1, train_size=ratio)
# Generate a list with ones
tri_ones = np.ones(len(triples))
try:
for train_index, test_index in sss.split(triples, tri_ones):
train_test_sum = set(np.append(train_index, test_index))
# Generates the val_index with the index numbers that
# does not appear on train or test list.
val_index = [indx for indx in range(0, triples.shape[0])
if indx not in train_test_sum]
x_train = [tuple(x.tolist()) for x in triples[train_index]]
x_test = [tuple(x.tolist()) for x in triples[test_index]]
x_val = [tuple(x.tolist()) for x in triples[val_index]]
except ValueError:
# With current label exists only 1 triple. Add to train set
x_train = [tuple(x) for x in trip_ddict[label]]
x_test = []
x_val = []
finally:
# Append generated triples to each set
train_triples += x_train
valid_triples += x_val
test_triples += x_test
# Save the splited subs as separate argument. May be heplful
self.splited_subs = {'updated': True,
'train_subs': train_triples,
'valid_subs': valid_triples,
'test_subs': test_triples
}
return {"train_subs": train_triples,
"valid_subs": valid_triples,
"test_subs": test_triples}
[docs] def train_split(self, ratio=0.8):
"""Split subs into three lists: train, valid and test
The triplets should have a specific name and size to be compatible
with the original library. Splits the original triplets (self.subs) in
three different lists: *train_subs*, *valid_subs* and *test_subs*.
The 'ratio' param will leave that quantity for train_subs, and the
rest will be a half for valid and the other half for test
:param float ratio: The ratio of all triplets required for *train_subs*
:return: A dictionary with splited subs
:rtype: dict
"""
# test if exist splited_sub and if it is updated
if self.splited_subs and self.splited_subs['updated']:
return {"train_subs": self.splited_subs['train_subs'],
"valid_subs": self.splited_subs['valid_subs'],
"test_subs": self.splited_subs['test_subs']}
# Subs musn't contain duplicates
self.subs = list(set(self.subs))
# if not, build split set and save as updated
if len(self.subs) == 0:
data = []
indices = 0
train_samples = 0
else:
data = np.matrix(self.subs)
indices = np.arange(data.shape[0])
np.random.shuffle(indices)
data = data[indices]
train_samples = int((1-ratio) * data.shape[0])
x_train = [tuple(x.tolist()[0]) for x in data[:-train_samples]]
x_val = [tuple(x.tolist()[0]) for x in
data[-train_samples:-int(train_samples/2)]]
x_test = [tuple(x.tolist()[0]) for x in data[-int(train_samples/2):]]
# Save the splited subs as separate argument. May be heplful
self.splited_subs = {'updated': True,
'train_subs': x_train,
'valid_subs': x_val,
'test_subs': x_test
}
return {"train_subs": x_train,
"valid_subs": x_val,
"test_subs": x_test}
[docs] def execute_query(self, query, headers={"Accept": "application/json"}):
"""Executes a SPARQL query to the endpoint
:param string query: The SPARQL query
:returns: A tuple compound of (http_status, json_or_error)
"""
try:
response = requests.get(self.SPARQL_ENDPOINT+query,
headers=headers)
if response.status_code is not 200:
return (response.status_code, response.text)
else:
return (response.status_code,
response.json()["results"]["bindings"])
except requests.exceptions.ConnectionError:
raise ExecuteQueryError("Error on endpoint")
except json.decoder.JSONDecodeError:
print(response.content)
raise ExecuteQueryError("Error on JSON decoder")
class ExecuteQueryError(Exception):
"""ExecuteQueryError"""
def __init__(self, message):
"""Created when any error occurs when executing a query"""
super(ExecuteQueryError, self).__init__(message)