"""Add License, Header.
Use pkglts
Problems:
- name of a model unit?
"""
from __future__ import print_function
from __future__ import absolute_import
import os.path
from os.path import isdir
import sys
from path import Path
import six
import shutil
from datetime import datetime
from pycropml.modelunit import ModelUnit
from . import error
from . import split_function
DATATYPE = {}
DATATYPE['INT'] = "int"
DATATYPE['STRING'] = "str"
DATATYPE['DOUBLE'] = "float"
DATATYPE['DOUBLELIST'] = "list"
DATATYPE['INTLIST'] = "list"
DATATYPE['STRINGLIST'] = "list"
DATATYPE['STRINGARRAY'] = "str"
DATATYPE['CHARLIST'] = "list"
DATATYPE['DATELIST'] = "datelist"
DATATYPE['DOUBLEARRAY'] = "float"
DATATYPE['INTARRAY'] = "int"
DATATYPE['BOOLEAN'] = "bool"
DATATYPE['DATE'] = "datetime"
[docs]
class Model2Package(object):
""" TODO
"""
num = 0
def __init__(self, models, dir=None, pkg_name=None):
"""TODO."""
self.models = models
self.dir = dir
self.with_import = True
if pkg_name is None:
self.pkg_name = "CropModel"
else:
self.pkg_name = pkg_name
self.cwd = Path(self.dir)
self.rep = os.path.abspath(os.path.dirname(self.cwd))
[docs]
def run(self):
"""TODO."""
self.generate_package()
#self.write_tests()
[docs]
def generate_package(self):
"""Generate a Cyml package equivalent to the xml definition.
Args:
- models : a list of model
- dir: the directory where the code is generated.
Returns:
- None or status
"""
# Create a directory (mymodel)
directory = Path(os.path.join(self.cwd, 'pyx'))
if isdir(directory):
self.dir = directory
else:
self.dir = directory.mkdir()
files = []
count = 0
for model in self.models:
self.generate_component(model)
filename = Path(os.path.join(self.dir, f"{signature(model).capitalize()}.pyx"))
with open(filename, "wb") as cyml_file:
# cyml_file.write(self.code.encode('utf-8','ignore'))
cyml_file.write(self.code.encode('utf-8'))
files.append(filename)
model.module_name = str(Path(filename).name)
count += 1
return files
[docs]
def generate_component(self, model_unit):
"""Todo"""
functions = []
if model_unit.modelid.split(".")[0] != "function":
func_name = f"model_{signature(model_unit)}"
else:
func_name = signature(model_unit)
types = [inp.datatype for inp in model_unit.inputs] + [out.datatype for out in model_unit.outputs]
self.code = "import numpy\n"
self.code += "from math import *\n"
if "DATE" in types or "DATELIST" in types:
self.code += "from datetime import datetime\n"
self.code += "\n"
if model_unit.initialization is not None and len(model_unit.initialization) != 0:
self.code += self.initialization(model_unit) + "\n"
self.code += self.generate_function_signature(func_name, model_unit.inputs) + "\n"
self.code += self.generate_function_doc(model_unit) + "\n"
self.code += self.generate_algorithm(model_unit) + "\n"
if model_unit.function:
function_files = [Path(os.path.join(model_unit.path, "crop2ml", function.filename))
for function in model_unit.function if function.language.lower() == "cyml"]
content = split_function.unique_functions(function_files)
self.code += content
self.code += "\n"
return self.code
[docs]
def generate_algorithm(self, model_unit):
outputs = model_unit.outputs
inputs = model_unit.inputs
tab = ' '*4
list_inputs = []
algo = ""
""" we declare all outputs which are not in inputs"""
output_declaration = ""
for inp in inputs:
list_inputs.append(inp.name)
for out in outputs:
if out.name not in list_inputs:
output_declaration += tab+"cdef "+my_input(out, True)+"\n"
for algorithm in model_unit.algorithms:
if (algorithm.language == "Cyml") or (algorithm.language == "cyml"):
algo = algorithm
break
if algo:
development = algo.development
lines = [tab+l for l in development.split('\n') if l.split()]
code = output_declaration
code += '\n'.join(lines)
code += '\n'+tab + 'return ' + ', '.join([o.name for o in outputs]) + '\n\n\n'
self.code = code
else:
raise error.Error("algorithm is not defined in model unit")
return self.code
[docs]
def initialization(self, model_unit):
outputs = model_unit.outputs
inputs = model_unit.inputs
tab = ' '*4
list_inputs = []
outs = []
""" we declare all outputs which are not in inputs"""
output_declaration = ""
other = ""
z = []
for inp in inputs:
if "variablecategory" in dir(inp) and inp.variablecategory == "state":
list_inputs.append(inp.name)
"""if inp.datatype in ("DOUBLE", "FLOAT"):
inp.default = "0.0"
if inp.datatype == "INT":
inp.default = "0" """
output_declaration += tab+"cdef "+my_input(inp, defa=True)+"\n"
outs.append(inp)
if not inp.default:
other += tab + inp.name + " = " + default_value(inp)+"\n"
elif "parametercategory" in dir(inp) and inp.parametercategory == "private":
outs.append(inp)
code = ""
if model_unit.initialization:
file_init = model_unit.initialization[0].filename
path_init = Path(os.path.join(model_unit.path, "crop2ml", file_init))
par = []
for inp in inputs:
if "parametercategory" in dir(inp):
par.append(inp)
elif inp.variablecategory == "exogenous":
par.append(inp)
with open(path_init, 'r') as f:
code_init = f.read()
if code_init is not None:
lines = [tab+l for l in code_init.split('\n') if l.split()]
code += self.generate_function_signature(f"init_{signature(model_unit)}", par) + '\n'
code += output_declaration
code += other
code += '\n'.join(lines)
code += '\n'+tab + 'return ' + ', '.join([o.name for o in outs]) + '\n'
return code
# documentation
[docs]
def generate_function_doc(self, model_unit):
return f'''\
"""
{generate_doc(model_unit)}
"""
'''
[docs]
def generate_function_signature(self, func_name,inputs):
#inputs = model_unit.inputs
# Compute name from title.
# We need an explicit name rather than infering it from Title
#name = desc.Title
code = f'def {func_name}('
code_size = len(code)
#_input_names = [inp.name.lower() for inp in inputs]
ins = [my_input(inp, False) for inp in inputs]
separator = ',\n' + code_size*' '
code += separator.join(ins)
code += '):'
return code
[docs]
def generate_test(self, model_unit):
tab = ' '*4
m = model_unit
model_name = signature(m)
psets = m.parametersets
self.codetest = ""
for v_tests in m.testsets:
test_runs = v_tests.test # different run in the thest
test_paramsets = v_tests.parameterset # name of paramsets
# map the paramsets
params = {}
if test_paramsets not in list(psets.keys()):
print(f'Unknown parameter {test_paramsets}')
else:
params.update(psets[test_paramsets].params)
for each_run in test_runs:
test_codes = []
# make a function that transforms a title into a function name
tname = list(each_run.keys())[0].replace(' ', '_')
tname = tname.replace('-', '_')
run, inouts = list(each_run.items())[0]
ins = inouts['inputs']
outs = inouts['outputs']
#print(outs.keys())
code = '\n'
test_codes.append(code)
code = f"def test_{tname}():"
test_codes.append(code)
code = f" params = {model_name}("
test_codes.append(code)
run_param = params.copy()
run_param.update(ins)
for k, v in six.iteritems(run_param):
code = f" {k} = {v},"
test_codes.append(code)
code = " )"
test_codes.append(code)
outnames = list(outs.keys())
for j, k in enumerate(m.outputs):
if k.name in outnames:
if k.datatype.strip() in ("STRINGLIST", "DATELIST", "STRINGARRAY", "DATEARRAY"):
code = tab + f"{k.name}_estimated = params[{j}]" if len(m.outputs) > 1 \
else tab + f"{k.name}_estimated = params"
test_codes.append(code)
code = tab + f"{k.name}_computed = {outs[k.name][0]}"
test_codes.append(code)
code = tab + f"assert np.all({k.name}_estimated == {k.name}_computed)"
test_codes.append(code)
if k.datatype.strip() in ("STRING", "BOOL", "INT", "DATE"):
code = tab + f"{k.name}_estimated = params[{j}]" if len(m.outputs) > 1 \
else tab + f"%{k.name}estimated = params"
test_codes.append(code)
code = tab + f"{k.name}_computed = {outs[k.name][0]}"
test_codes.append(code)
code = tab+ "assert (%s_estimated == %s_computed)"%(k.name,k.name)
test_codes.append(code)
if k.datatype.strip() in ("DOUBLELIST", "DOUBLEARRAY"):
code = tab + f"{k.name}_estimated = np.around(params[{j}], {outs[k.name][1]})"\
if len(m.outputs) > 1 \
else tab + f"{k.name}_estimated = np.around(params, {outs[k.name][1]})"
test_codes.append(code)
code = tab + f"{k.name}_computed = {outs[k.name][0]}"
test_codes.append(code)
code = tab + f"assert np.all({k.name}_estimated == {k.name}_computed)"
test_codes.append(code)
if k.datatype.strip() in ("INTLIST", "INTARRAY"):
code = tab + f"{k.name}_estimated = params[{j}]" if len(m.outputs) > 1 \
else tab + f"{k.name}_estimated = params"
test_codes.append(code)
code = tab + f"{k.name}_computed = {outs[k.name][0]}"
test_codes.append(code)
code = tab + f"assert np.all({k.name}_estimated == {k.name}_computed)"
test_codes.append(code)
if k.datatype.strip() == "DOUBLE":
code = tab + f"{k.name}_estimated = round(params[{j}], {outs[k.name][1]})"\
if len(m.outputs) > 1 \
else tab + f"{k.name}_estimated = round(params, {outs[k.name][1]})"
test_codes.append(code)
code = tab + f"{k.name}_computed = {outs[k.name][0]}"
test_codes.append(code)
code = tab + f"assert ({k.name}_estimated == {k.name}_computed)"
test_codes.append(code)
code = '\n'.join(test_codes)
self.codetest += code
return self.codetest
[docs]
def generate_func_test(self, model_unit):
pass
[docs]
def write_tests(self):
"""
TODO: Manage several models rather than just one.
"""
self.rep = Path(os.path.join(self.rep, 'test', 'pyx'))
if not isdir(self.rep):
self.rep.mkdir()
files = []
count = 0
for model in self.models:
codetest = self.generate_test(model)
filename = Path(os.path.join(self.rep, f"test_{signature(model).capitalize()}.pyx"))
codetest = f"""\
#'Test generation'
from {signature(model)} import *
from math import *
import numpy
{codetest}"""
with open(filename, "wb") as cyml_file:
cyml_file.write(codetest.encode('utf-8'))
files.append(filename)
count += 1
return files
[docs]
def signature(model: ModelUnit):
"""_summary_
Args:
model (ModelUnit): A Python object of a Crop2ML model Unit
Returns:
str: name
"""
name = model.name
name = name.strip()
name = name.replace(' ', '_').lower()
return name
[docs]
def generate_doc(model: ModelUnit):
desc = model.description
return f"""\
{desc.Title}
Author: {desc.Authors}
Reference: {desc.Reference}
Institution: {desc.Institution}
ExtendedDescription: {desc.ExtendedDescription}
ShortDescription: {desc.ShortDescription}"""
[docs]
def transfDate(type, elem):
ser = elem.split("/")
if len(ser) == 3:
year, month, day = ser[0], ser[1], ser[2]
return f"datetime({year}, {month}, {day}) "
if len(ser) == 4:
year, month, day, hour= ser[0], ser[1], ser[2], ser[3]
return f"datetime({year}, {month}, {day}, {hour}) "
if len(ser) == 5:
year, month, day, hour, min = ser[0], ser[1], ser[2],ser[3], ser[4]
return f"datetime({year}, {month}, {day}, {hour}, {min}) "
if len(ser) == 6:
year, month, day, hour, min, sec = ser[0], ser[1], ser[2], ser[3], ser[4], ser[5]
return f"datetime({year}, {month}, {day}, {hour}, {min}, {sec}) "
[docs]
def transfDateList(type, elem):
res = ""
for dat in eval(elem):
t = transfDate("DateTime", dat)
res += t + ","
return f"[{res}]"
[docs]
def transBool(type, elem):
return elem.lower().capitalize()
[docs]
def transf(type_, elem):
if type_ == "DATE":
return transfDate(type_, elem)
elif type_ == "DATELIST":
return transfDateList(type_, elem)
elif type_ == "BOOLEAN":
return transBool(type_, elem)
else:
return elem
[docs]
def default_value(inp):
type_ = inp.datatype
if type_.endswith("LIST"): return "[]"
elif type_ == "INT": return "0"
elif type_ =="DOUBLE": return "0.0"
elif type_ == "DATE": return "None"
elif type_.endswith("ARRAY") and not inp.len: return "None"
elif type_.endswith("ARRAY") and inp.len:
if type_=="INTARRAY": return f"array('i', [0]*{inp.len})"
if type_=="DOUBLEARRAY": return f"array('f', [0.0]*{inp.len})"
DATATYPE2 = {}
DATATYPE2['INT'] = "int"
DATATYPE2['STRING'] = "str"
DATATYPE2['DOUBLE'] = "float"
DATATYPE2['DOUBLELIST'] = "floatlist"
DATATYPE2['INTLIST'] = "intlist"
DATATYPE2['STRINGLIST'] = "stringlist"
DATATYPE2['STRINGARRAY'] = "stringarray"
DATATYPE2['CHARLIST'] = "stringlist"
DATATYPE2['DATELIST'] = "datelist"
DATATYPE2['DOUBLEARRAY'] = "floatarray"
DATATYPE2['INTARRAY'] = "intarray"
DATATYPE2['BOOLEAN'] = "bool"
DATATYPE2['DATE'] = "datetime"