Skip to content
Snippets Groups Projects
Commit 8d979505 authored by Mesharo's avatar Mesharo
Browse files

kapitola, comments, bug fixes, serialization, a bit of parser

parent 4fba9e54
Branches
No related merge requests found
File added
......@@ -167,7 +167,7 @@ def get_questions(input_filepath_questions) -> list:
input_file_questions.close()
return result
def save_linked(output_filepath: str, linked_questions_answers: dict) -> None:
def save_linked(output_filepath_linked: str, linked_questions_answers: dict) -> None:
"""
Write linked postgresql questions with their answers into a file.
......@@ -176,7 +176,7 @@ def save_linked(output_filepath: str, linked_questions_answers: dict) -> None:
linked_questions_answers -- dictionary with questions as keys and lists of answers as values
"""
output_file = open(output_filepath, 'a', encoding='utf8')
output_file = open(output_filepath_linked, 'a', encoding='utf8')
for question, answers in linked_questions_answers.items():
output_file.write(str(question) + '\n')
......@@ -186,7 +186,7 @@ def save_linked(output_filepath: str, linked_questions_answers: dict) -> None:
output_file.close()
def link_questions_with_answers(input_filepath_answers: str, questions: list) -> None:
def link_questions_with_answers(input_filepath_answers: str, questions: list, output_filepath_linked: str) -> None:
"""Link postgresql questions with corresponding answers.
Sort postgresql question based on their ID.
......@@ -227,11 +227,9 @@ def link_questions_with_answers(input_filepath_answers: str, questions: list) ->
input_file_answers.close()
result = dict(zip(questions, result.values()))
save_linked(result)
save_linked(output_filepath_linked, result)
return result
def retrieve_linked(input_filepath_linked: str) -> dict:
def load_linked(input_filepath_linked: str) -> dict:
"""Get questions and answers.
Read saved postgresql questions with their corresponding
......@@ -290,4 +288,42 @@ def find_code_section(linked: dict) -> dict:
result[question[0]] = list_of_questions_list
return result
def save_code_sections(input_filepath_linked: str, output_filepath_codes: str) -> None:
"""Save codes into file.
Save codes into a file in the form of tuple.
First element is id of question.
Second element is a list of lists containing separate code sections.
Arguments:
input_filepath_linked -- path to the file with questions and answers.
output_filepath_codes -- path to where we want to save the codes.
"""
codes = find_code_section(load_linked(input_filepath_linked))
with open(output_filepath_codes, 'a', encoding='utf8') as f:
for key, values in codes.items():
f.write(str((key, values)) + '\n')
def load_code_sections(input_filepath_codes: str) -> dict:
"""Load codes from file.
Return dictionary.
Key is id of question.
Values is a list of lists containing separate code sections.
Argument:
input_filepath_codes -- path to the file with code sections.
"""
result = {}
with open(input_filepath_codes, 'r', encoding='utf8') as f:
while line := f.readline():
tmp = eval(line.rstrip())
result[tmp[0]] = tmp[1]
return result
\ No newline at end of file
from handling_dataset.main import retrieve_linked, find_code_section
import sqlparse
from sqlparse.tokens import Whitespace, Newline
import sqlglot.optimizer
import sqlglot.optimizer.qualify
from handling_dataset.main import filter_postgresql_questions, load_code_sections, get_questions, link_questions_with_answers, save_code_sections
import re
import os.path
import sqlglot
import copy
from io import StringIO
from html.parser import HTMLParser
......@@ -42,30 +45,99 @@ def erase_html(code_section: str) -> str:
return result
def print_parsed(id: str, codes: list) -> None:
for code in codes:
tmp = erase_html(code)
statements = sqlparse.split(tmp)
for statement in statements:
parsed = sqlparse.parse(statement)
for parsed_statement in parsed:
print(f'Statement: {parsed_statement}')
print('Tokens: ')
for token in parsed_statement.tokens:
if token.ttype not in (Whitespace, Newline):
print(f' - {token}')
def analyze(id: str, codes: list) -> None:
print('CODES: ')
for code in codes:
code = erase_html(code)
#print(f'-- code: {code}')
expression_tree = False
try:
expression_trees = sqlglot.parse(code, dialect='postgres')
except sqlglot.errors.ParseError:
#print(f'Sqlglot failed to parse given statement')
continue
except sqlglot.errors.TokenError:
#print(f'Sqlglot failed to tokenize given statement')
continue
for expression_tree in expression_trees:
#print(f'-- tree: {repr(expression_tree)}')
solved = False
try:
undo = copy.deepcopy(expression_tree)
sqlglot.optimizer.qualify.qualify(expression_tree)
root = sqlglot.optimizer.build_scope(expression_tree)
if root:
for column in sqlglot.optimizer.find_all_in_scope(root.expression, sqlglot.exp.Column):
print(f"{column} => {root.sources[column.table]}")
solved = True
except sqlglot.errors.OptimizeError:
#print(f'Sqlglot failed to optimize given statement.')
expression_tree = undo
continue
print('-----------')
print('--- --- ---')
def run(input_filepath_all_answers: str, input_filepath_postgresql_questions: str, input_filepath_linked: str, input_filepath_codes: str) -> None:
"""Main function.
Automatization/serialization.
Go through the filepaths in function args, check if they exist.
Depending on the stage the project is in, take the correct steps.
Arguments:
input_filepath_all_answers -- path to the file with all answers from Posts.xml.
input_filepath_postgresql_questions -- path to the file with only postgresql questions from Posts.xml.
input_filepath_linked -- path to the file with postgresql questions linked with their corresponding answers.
input_filepath_codes -- path to the file with filtered code sections out of postgresql questions and their corresponding answers.
"""
if os.path.isfile(input_filepath_codes):
codes = load_code_sections(input_filepath_codes)
count = 0
for key, values in codes.items():
if count < 6:
analyze(key, values)
count += 1
return
if os.path.isfile(input_filepath_linked):
save_code_sections(input_filepath_linked, input_filepath_codes)
run(input_filepath_all_answers, input_filepath_postgresql_questions, input_filepath_linked, input_filepath_codes)
if os.path.isfile(input_filepath_postgresql_questions):
link_questions_with_answers(input_filepath_all_answers, get_questions(input_filepath_postgresql_questions), input_filepath_linked)
save_code_sections(input_filepath_linked, input_filepath_codes)
run(input_filepath_all_answers, input_filepath_postgresql_questions, input_filepath_linked, input_filepath_codes)
# creates input_filepath_all_answers, input_filepath_postgresql_questions
filter_postgresql_questions('D:\\stackoverflow\\Posts.xml', 'D:\\stackoverflow\\Tags.xml', input_filepath_postgresql_questions, input_filepath_all_answers)
# creates input_filepath_linked
link_questions_with_answers(input_filepath_all_answers, get_questions(input_filepath_postgresql_questions), input_filepath_linked)
#creates input_filepath_codes
save_code_sections(input_filepath_linked, input_filepath_codes)
run(input_filepath_all_answers, input_filepath_postgresql_questions, input_filepath_linked, input_filepath_codes)
if __name__ == "__main__":
codes = find_code_section(retrieve_linked('D:\\final.txt'))
count = 0
for key, values in codes.items():
if count > 10:
break
print_parsed(key, values)
count += 1
\ No newline at end of file
run('D:\\all_answers', 'D:\\postgresql_questions.txt', 'D:\\linked.txt', 'D:\\codes.txt')
"""
DONE:
1. Kapitola ohledně výběru parseru (stránka 10)
2. Automatizace/serializace na základě aktuálního progresu
partially 3. Parser - tabulky s jejich aliasy
OTÁZKY:
1. zahodit pokud sqlglot nedokáže qualify?
2. cíl bakalářky - knihovna
"""
\ No newline at end of file
......@@ -78,7 +78,6 @@ def testing2():
print(mydict)
import sqlparse
def erasing_backslashes():
xd = """SELECT \\\'ALTER TABLE "\\\'||nspname||\\\'"."\\\'||relname||\\\'" DROP CONSTRAINT "\\\'||conname||\\\'";\\\'
......@@ -102,3 +101,75 @@ def erasing_backslashes():
print('Tokens: ')
for token in statement.tokens:
print(f' - {token}')
from sql_metadata import Parser
def sqlmetadata():
try:
print(Parser('TABLE hello').tables)
except ValueError:
print('Disgustingly dumb user')
print(Parser("SELECT a, b + 1 AS c FROM d").columns_aliases)
print('--- NEXT ---')
print(Parser("CREATE TABLE cars (brand VARCHAR(255),model VARCHAR(255),year INT);").tables)
print(Parser("CREATE TABLE cars (brand VARCHAR(255),model VARCHAR(255),year INT);").columns)
import sqlglot
import sqlglot.optimizer
import sqlglot.optimizer.qualify
def sqlglottry():
for column in sqlglot.parse_one("""
insert into EscapeTest (text) values (E'This is the first part
And this is the second');
insert into EscapeTest (text) values (E'This is the first part
And this is the second');
""").find_all(sqlglot.exp.Table):
print(column.alias_or_name)
print('--- NEXT ---')
statement = "CREATE TABLE cars (brand VARCHAR(255),model VARCHAR(255),year INT);"
for column in sqlglot.parse_one(statement, dialect="postgres").find_all(sqlglot.exp.Column):
print(column.alias_or_name)
import copy
def tmp():
expression_tree = sqlglot.parse_one("""
SELECT
a,
c
FROM (
SELECT
a,
b
FROM x
) AS x
JOIN (
SELECT
b,
c
FROM y
) AS y
ON x.b = y.b
""",
dialect="postgres")
solved = False
try:
undo = copy.deepcopy(expression_tree)
sqlglot.optimizer.qualify.qualify(expression_tree)
root = sqlglot.optimizer.build_scope(expression_tree)
for column in sqlglot.optimizer.find_all_in_scope(root.expression, sqlglot.exp.Column):
print(f"{column} => {root.sources[column.table]}")
solved = True
except sqlglot.errors.OptimizeError:
expression_tree = undo
if __name__ == '__main__':
#print('SQLMETADATA: ')
#sqlmetadata()
tmp()
\ No newline at end of file
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment