from pycropml import composition
from pycropml.pparse import model_parser
from path import Path
import networkx as nx
from collections import defaultdict
from IPython.display import Image, display, SVG
from networkx.drawing.nx_pydot import to_pydot
from pycropml.render_cyml import signature
import os
import sys
from pycropml.transpiler.main import Main
from copy import copy
from pycropml.render_cyml import my_input, DATATYPE
import pandas as pd
import importlib
[docs]
class Package:
def __init__(self, name, metainfo, path=None):
self.name = name
self.metainfo = metainfo
if not path:
# package directory
import inspect
# get the path of the file which call this function
call_path = os.path.abspath(inspect.stack()[1][1])
self.path = os.path.dirname(call_path)
self.wralea_path = call_path
# package directory
if not path:
# package directory
import inspect
# get the path of the file which call this function
call_path = os.path.abspath(inspect.stack()[1][1])
self.path = os.path.dirname(call_path)
self.wralea_path = call_path
# wralea.py path is specified
else:
if not os.path.exists(path):
os.mkdir(path)
if not os.path.isdir(path):
self.path = os.path.dirname(path)
self.wralea_path = path
else:
self.path = path
self.wralea_path = os.path.join(self.path, "model.py")
self.crop2ml_path = os.path.join(self.path, "crop2ml")
[docs]
class PackageManager:
def __init__(self, proj):
self.syspath = sys.path[:]
self.pkg = {}
self.user_crop2ml_path = set()
self.sys_crop2ml_path = set()
self.proj = proj
[docs]
def check_exist(self):
pass
[docs]
def contain_pkg(self):
pk = {}
for root, dirs, files in os.walk(self.proj):
if "crop2ml" in dirs:
pk[root.split('/')[-1]] = root
return pk
[docs]
def get_path(self,pkg):
di = self.contain_pkg()
wf_path = {}
for k, v in di.items():
wf_name = k.split('\\')[-1]
wf_path[wf_name] = v
if pkg in wf_path.keys():
return wf_path[pkg]
else:
print("pkge doesn't exist")
[docs]
class Topology:
pkgs = {}
def __init__(self, name, pkg=None):
self.name = name
if pkg:
self.pkg = pkg
else:
if self.isPackage(self.name):
self.pkg = self.load_pkge(self.name).crop2mlpath
else:
self.pkg = input(f"Give the path of package {self.name}")
self.data = Path(self.pkg)/"crop2ml"
self.diff_in, self.diff_out = {}, {}
composite_file = None
composite_files = self.data.glob("composition*.xml")
if composite_files:
composite_file = composite_files[0]
self.mu = model_parser(self.pkg)
self.model, = composition.model_parser(composite_file)
self.pkgs[self.name] = [self.pkg, self.model]
self.model.inputs = self.meta_inp(self.name)
self.model.outputs = self.meta_out(self.name)
self.model.ext = self.meta_ext(self.name)
self.model.states = self.findstates(self.model.inputs, self.model.outputs)
self.model.path = Path(self.pkg)
self.minout()
self.path_pkg = None
self.model.diff_in = self.diff_in
self.model.diff_out = self.diff_out
self.composite_file = composite_file
[docs]
def isPackage(self, name):
if sys.version_info[0] >= 3:
pkg = importlib.util.find_spec(name) # Python>=3
else:
if name not in sys.modules: pkg=None
if pkg:
if "crop2mlpath" in dir(self.load_pkge(name)):
return True
else:
return False
else:
return False
[docs]
def findstates(self, inputs, outputs):
st = []
stname = []
for n in inputs:
if "variablecategory" in dir(n) and n.variablecategory == "state":
stname.append(n.name)
st.append(n)
for m in outputs:
if "variablecategory" in dir(m) and m.variablecategory == "state" and m.name not in stname:
st.append(m)
stname.append(m.name)
return st
[docs]
def load_pkge(self, name):
module = None
try:
module = importlib.import_module(name)
except Exception as e:
print(e)
return module
[docs]
def createGraph(self):
d = defaultdict(list)
res = []
G = nx.DiGraph(name=self.model.name)
if not self.model.internallink:
for mod in self.model.model:
d[mod.name].append(mod.name)
return nx.DiGraph(d, name=self.model.name)
for mod in self.model.model:
t = []
v = []
for inter in self.model.internallink:
source = inter["source"].split(".")[0]
target = inter["target"].split(".")[0]
if mod.name == source:
#d[mod.name].append(target)
t.append(target)
v.append(1)
if t:
z = pd.DataFrame({"target": t, "value": v})
t_ = z["target"].unique()
for j in t_:
res.append((mod.name, j, z.loc[z["target"] == j, "value"].sum()))
#print(res)
#return nx.DiGraph(d, name = self.model.name)
G.add_weighted_edges_from(res)
return G
[docs]
def create_edgeInOut(self):
edge_inout = defaultdict(list)
for inter in self.model.internallink:
out = inter["source"].split(".")[1]
out_name = inter["source"].split(".")[0]
inp = inter["target"].split(".")[1]
if out != inp:
edge_inout[out_name].append([out, inp])
return edge_inout
[docs]
def topologicalSort(self):
if not self.model.internallink:
print(self.model.model[0].name)
self.model.ord = [self.model.model[0].name]
return
ordV = list(nx.topological_sort(self.createGraph()))
self.model.ord = ordV
return ordV
[docs]
def info_minout(self):
inout = {}
for m in self.model.model:
if m.file.split(".")[0] == "unit" and m.package_name is None:
for mo in self.mu:
if mo.name == m.name:
inout[mo.name] = (mo.inputs, mo.outputs)
else:
pkgname = m.package_name
inout[m.name] = (self.meta_inp(pkgname), self.meta_out(pkgname))
return inout
[docs]
def minout(self):
inout = {}
i = 0
for m in self.model.model:
if m.file.split(".")[0] == "unit" and m.package_name is None:
for mo in self.mu:
if mo.name == m.name:
inp = [m.name for m in mo.inputs]
out = [m.name for m in mo.outputs]
inout[mo.name] = (inp, out)
self.model.model[i].inputs = mo.inputs
self.model.model[i].outputs = mo.outputs
self.model.model[i].description = mo.description
self.model.model[i].parametersets = mo.parametersets
self.model.model[i].testsets = mo.testsets
self.model.model[i].path = mo.path
self.model.model[i].function = mo.function
self.model.model[i].initialization = mo.initialization
self.model.model[i].algorithms = mo.algorithms
else:
pkgname = m.package_name
inp = [p.name for p in self.meta_inp(pkgname)]
out = [p.name for p in self.meta_out(pkgname)]
inout[m.name] = (inp, out)
self.model.model[i].inputs = self.meta_inp(pkgname)
self.model.model[i].outputs = self.meta_out(pkgname)
data = Path(os.path.join(self.path_pkg,"crop2ml"))
composite_file = data.glob("composition*.xml")[0]
mc, = composition.model_parser(composite_file)
self.model.model[i].description = mc.description
self.model.model[i].inputlink = mc.inputlink
self.model.model[i].outputlink = mc.outputlink
self.model.model[i].internallink = mc.internallink
self.model.model[i].path = self.path_pkg
self.model.model[i].model, = composition.model_parser(composite_file)
#self.model.model[i].parametersets = mc.parametersets
#self.model.model[i].testsets = mc.testsets
i = i + 1
return inout
[docs]
def algorithm(self):
if not self.topologicalSort():
code = ""
for mod in self.model.model:
res = ', '.join(self.minout()[mod.name][1])
fn = mod.name.strip().replace(' ', '_').lower()
ps = ','.join(self.minout()[mod.name][0])
code += f"{res} = model_{fn}({ps})\n"
return code
code = ""
W = self.topologicalSort()
edge = self.create_edgeInOut()
for v in W:
res = ', '.join(self.minout()[v][1])
fn = v.strip().replace(' ', '_').lower()
ps = ','.join(self.minout()[v][0])
# TODO does the check make sense?
if not self.check_compo(self.model, v):
code += f"{res} = model_{fn}({ps})\n"
else:
code += f"{res} = model_{fn}({ps})\n"
if len(edge) != 0:
if v in list(edge.keys()):
for val in list(edge[v]):
code += f"{val[1]} = {val[0]}\n"
return code
[docs]
def write_xml(self):
G = self.createGraph()
filename = f"{self.model.name}.xml"
nx.write_graphml_xml(G, filename)
[docs]
def write_png(self, dir_images):
G = self.createGraph()
a = to_pydot(G)
img = Path(os.path.join(dir_images, f"{self.model.name}.png"))
print(img)
df = pd.DataFrame(G.edges(data=True), columns=['Source', 'Target', 'Weight'])
df['Weight'] = df['Weight'].map(lambda x: x['weight'] if 'weight' in x else 0)
a.write_png(img)
df.to_csv(os.path.join(os.path.dirname(dir_images), 'graphe.csv'), index=False)
[docs]
def writeSVG(self):
a = to_pydot(self.createGraph())
a.write_svg(f'{self.model.path}/doc/images/{self.model.name}.svg')
[docs]
def display_wf(self):
a = to_pydot(self.createGraph())
d = a.create_png()
display(Image(d))
[docs]
def display_wf_svg(self):
a = to_pydot(self.createGraph())
d = a.create_svg()
display(SVG(d))
[docs]
def algo2cyml(self, dir_images=None):
if dir_images:
self.write_png(dir_images)
code = 'from datetime import datetime\nfrom math import *\n'
tab = ' '*4
for mod in self.model.model:
#n = self.name.replace("-", "_")
sig = signature(mod)
if mod.package_name:
n = mod.package_name.replace("-", "_")
code += f'from {n}.{sig} import model_{sig}'
else:
n = self.name.replace("-", "_")
code += f'from {n}.{sig} import model_{sig}'
if mod.initialization:
code += f', init_{sig}\n'
else:
code += '\n'
name = signature(self.model)
sig_params = ",\n ".join(map(my_input, self.meta_inp(self.name)))
signature_mod = f"def model_{name}({sig_params}):"
code += signature_mod+"\n"
code += self.decl(defa=False)
code += self.inps_assignment()
lines = [tab+l for l in self.algorithm().split('\n') if l.split()]
lines.append(self.outs_assignment())
code += '\n'.join(lines)
code += "\n" + tab + "return " + "(" + ", ".join([out.name for out in self.model.outputs]) + ")"
out_states = [out for out in self.model.outputs if out.variablecategory == "state"]
if self.model.initialization:
file_init = self.model.initialization[0].filename
path_init = Path(os.path.join(self.pkg, "crop2ml", file_init))
with open(path_init, 'r') as f:
code_init = f.read()
if code_init is not None:
lines = [tab+l for l in code_init.split('\n') if l.split()]
code += self.generate_function_signature(self.model) + '\n'
code += self.val_init(self.model)
code += '\n'.join(lines)
code += '\n' + tab + 'return ' + ', '.join(self.listab) + '\n'
else:
z = self.initialization2()
if z:
code += self.generate_function_signature2(self.model) + '\n'
code += self.val_init2(self.model)
# call the initialization method of all the models in model(model.model) if it exists
code += self.initialization2()
code += self.initreturn()
return code
[docs]
def val_init2(self, model):
statenames = [st.name for st in model.states]
inouts = model.inputs + model.outputs
#inout = inputs + outputs
tab = ' '*4
code = ""
outs = []
for inp in inouts:
if inp.name not in outs and inp.name in statenames:
outs.append(inp.name)
code += tab+"cdef "+my_input(inp, defa=False)+"\n"
return code
[docs]
def initreturn(self):
# return all the unique state variables and the parameters whose parametercategory is private
tab = ' '*4
outs = []
for mod in self.model.model:
for inp in mod.inputs:
if "variablecategory" in dir(inp) and inp.variablecategory == "state" and inp.name not in outs:
outs.append(inp.name)
if "parametercategory" in dir(inp) and inp.parametercategory == "private" and inp.name not in outs:
outs.append(inp.name)
code = tab + "return " + "(" + ", ".join(outs) + ")" + "\n"
return code
[docs]
def initialization2(self):
"""call the initialization method of all the models in model(model.model) if it exists
"""
code = ""
tab = ' '*4
for mod in self.model.model:
if mod.initialization:
# o1,02 = init_mod1(i1, i2)
par = []
outs = []
for inp in mod.inputs:
if "parametercategory" in dir(inp):
par.append(inp.name)
elif inp.variablecategory == "exogenous":
par.append(inp.name)
if "variablecategory" in dir(inp) and inp.variablecategory == "state":
outs.append(inp.name)
elif "parametercategory" in dir(inp) and inp.parametercategory == "private":
outs.append(inp.name)
code += tab + f"{', '.join(outs)} = init_{signature(mod)}({', '.join(par)})\n"
return code
[docs]
def generate_function_signature2(self, model):
code=""
# get in inputs unique exogenous variables, parameters from all models in model (model.model) to create signature
tab = []
init_inp=[]
for mod in model.model:
for inp in mod.inputs:
if inp.name not in tab and (("variablecategory" in dir(inp) and inp.variablecategory == "exogenous") or ("parametercategory" in dir(inp))):
tab.append(inp.name)
init_inp.append(inp)
code = f'\n\ndef init_{signature(model)}('
code_size = len(code)
#_input_names = [inp.name.lower() for inp in inputs]
ins = [my_input(inp) for inp in init_inp]
separator = ',\n' + code_size*' '
code += separator.join(ins)
code += '):\n'
return code
[docs]
def generate_function_signature(self, model):
input = model.inputs
output = model.outputs
inout = input + output
outname = [o.name for o in output]
tab = []
# initialization inputs
for inp in inout:
if inp.name not in outname and not inp.name.endswith("_t1"):
if inp.name not in tab:
tab.append(inp)
if sys.version_info[0] >= 3:
init_inp = tab.copy()
else:
init_inp = tab
# Compute name from title.
# We need an explicit name rather than infering it from Title
#name = desc.Title
code = f'\n\ndef init_{signature(model)}('
code_size = len(code)
#_input_names = [inp.name.lower() for inp in inputs]
ins = [my_input(inp) for inp in init_inp]
separator = ',\n' + code_size*' '
code += separator.join(ins)
code += '):\n'
return code
[docs]
def val_init(self, model):
inputs = model.inputs
outputs = model.outputs
statenames = [st.name for st in model.states]
#inout = inputs + outputs
tab = ""
self.listab = []
for inp in inputs:
if inp.name not in self.listab and inp.name in statenames:
name = inp.name
self.listab.append(name)
if inp.datatype == "INT":
tab += f" cdef int {name} = 0\n"
if inp.datatype == "DOUBLE":
tab += f" cdef float {name} = 0.0\n"
if inp.datatype == "STRING":
tab += f" cdef str {name} = '' \n"
if inp.datatype == "BOOLEAN":
tab += f" cdef bool {name} = False\n"
if inp.datatype == "INTLIST":
tab += f" cdef intlist {name}\n"
if inp.datatype == "DOUBLELIST":
tab += f" cdef floatlist {name}\n"
if inp.datatype == "STRINGLIST":
tab += f" cdef stringlist {name}\n"
if inp.datatype == "DATELIST":
tab += f" cdef datelist {name} \n"
if inp.datatype == "DATE":
tab += f" cdef datetime {name}\n"
if inp.datatype == "INTARRAY":
tab += f" cdef intarray {name}\n"
if inp.datatype == "DOUBLEARRAY":
tab += f" cdef doublearray {name}\n"
if inp.datatype == "STRINGARRAY":
tab += f" cdef stringarray {name}\n"
return tab
[docs]
def retrieve(self, pkgname):
if pkgname in self.pkgs:
pkg = self.pkgs[pkgname][0]
mc = self.pkgs[pkgname][1]
else:
model = Topology(name=pkgname)
pkg = model.pkg
mc = model.model
self.pkgs[pkgname] = [pkg, mc]
return pkg, mc
[docs]
def check_compo(self, mc, m):
"""check if a model m is a composite model including in mc"""
test = False
for mod in mc.model:
if mod.name == m:
if mod.file.split(".")[0] == "composition" or mod.package_name:
test = True
break
return test
[docs]
def pkg_m(self,mc, m):
"""get the name of composite model package """
pkg_name = None
for mod in mc.model:
if mod.name == m:
pkg_name = mod.package_name
return pkg_name
"""
#get the path of the package
def path_pkg(self, mc,m):
name = self.pkg_m(mc,m)
ppkg = PackageManager(self.path).get_path(name)
return ppkg
"""
[docs]
def info_outputs_mu(self, ppkg, mu, varname):
"""get the info of an output of a model unit from its name and the name of the output"""
mod = model_parser(ppkg)
for m in mod:
if m.name == mu:
for out in m.outputs:
if out.name == varname:
return out
[docs]
def get_mu_inp(self, pkgname, varname):
"""get the model unit from an input of a composite model"""
listvar = []
ppkg, mc = self.retrieve(pkgname)
for inp in mc.inputlink:
var = inp["source"]
mod = inp["target"].split(".")[0]
if var == varname and var not in listvar:
if self.check_compo(mc,mod):
ppkg, mc = self.retrieve(self.pkg_m(mc, mod))
self.get_mu_inp(ppkg, var)
inp = self.info_inputs_mu(ppkg, mod, var)
listvar.append(var)
return inp
[docs]
def get_mu_out(self, pkgname, varname):
listvar = []
ppkg, mc = self.retrieve(pkgname)
for out in mc.outputlink:
var = out["target"]
mod = out["source"].split(".")[0]
if var == varname and var not in listvar:
if self.check_compo(mc,mod):
ppkg, mc = self.retrieve(self.pkg_m(mc, mod))
self.get_mu_out(ppkg, var)
out = self.info_outputs_mu(ppkg, mod, var)
listvar.append(var)
return out
"""def get_mu_out(self,ppkg, mc, varname):
listvar=[]
data = Path(ppkg)/"crop2ml"
composite_file = data.glob("composition.%s*.xml"%mc)[0]
mc, = composition.model_parser(composite_file)
for out in mc.outputlink:
var = out["target"]
mod = out["source"].split(".")[0]
if var==varname and var not in listvar:
if self.check_compo(mc,mod)==True:
ppkg, mc = self.retrive(self.pkg_m(mc, mod))
self.get_mu_out(ppkg, mod, var)
inp = self.info_outputs_mu(ppkg,mod,var)
listvar.append(var)
return inp """
[docs]
def decl(self, defa=True):
info_var = self.info_minout()
inp = [m.name for m in self.meta_inp(self.name)]
declaration = ""
tab = ' '*4
for k, v in info_var.items():
for j in v[0]:
if j.name not in inp:
declaration += f"{tab}cdef " + my_input(j, defa=False) + "\n"
inp.append(j.name)
for w in v[1]:
if w.name not in inp:
declaration += f"{tab}cdef " + my_input(w, defa) + "\n"
inp.append(w.name)
return declaration
[docs]
def inps_assignment(self):
code = ""
tab = ' '*4
for inter in self.model.inputlink:
var = inter["source"]
var_mod = inter["target"].split(".")[1]
if var != var_mod:
code += f"{tab}{var_mod} = {var} \n"
return code
[docs]
def outs_assignment(self):
tab = ' '*4
code = ""
for inter in self.model.outputlink:
var = inter["target"]
var_mod = inter["source"].split(".")[1]
if var != var_mod:
code += f"{tab}{var} = {var_mod}\n"
return code
[docs]
def compotranslate(self, language):
d = self.algo2cyml()
test = Main(d, language, self.model,self.model.name)
test.parse()
test.to_ast(d)
code = test.translate()
return code
[docs]
def translate_all(self, model):
for mod in model.model:
if mod.package_name is not None:
T = Topology(mod.package_name)
model = self.retrieve(mod.package_name)[1]
self.translate_all(model)
[docs]
def translate(self):
#print(self.algo2cyml())
self.translate_all(self.model)
#from pycropml.topology import Topology
'''
from pycropml.topology import Topology
from pycropml.transpiler.generators.openaleaGenerator import OpenaleaCompo
pkg1 = "C:/Users/midingoy/Documents/THESE/pycropml_pheno/test/Models/energybalance_pkg"
T = Topology(name='energybalance', pkg=pkg1)
a =OpenCompo(tree =None, model = T.model)
"C:/Users/midingoy/Documents/THESE/pycropml_pheno/test/Tutorial/testA"
print(T.compotranslate('cs'))
T.translate()
algo = T.algo2cyml
aCsharpCompo(tree =None, model = T.model)
T.translate()
print(T.algo2cyml())
print(T.compotranslate('cs'))
T.translate_all()
T.write_png()
T.decl()
T.model.model[0].package_name
print(T.algo2cyml())
from pycropml.cyml import transpile_package
transpile_package(pkg, "py")
'''