# coding: utf-8
from __future__ import absolute_import
from __future__ import print_function
import os
from os.path import isdir
from path import Path
from copy import deepcopy
from typing import *
from collections import defaultdict
from pycropml.transpiler.antlr_py.to_CASG import to_dictASG, to_CASG
from pycropml.transpiler.antlr_py.bioma.biomaExtraction import BiomaExtraction
from pycropml.transpiler.pseudo_tree import Node
from pycropml.transpiler.antlr_py.csharp import cs_cyml
from pycropml.transpiler.generators.cymlGenerator import CymlGenerator
from pycropml.transpiler.ast_transform import transform_to_syntax_tree
from pycropml.transpiler.antlr_py.generateCyml import writeCyml
from pycropml.transpiler.antlr_py.createXml import Pl2Crop2ml
from pycropml.transpiler.antlr_py import repowalk
from pycropml.transpiler.antlr_py.csharp.csharp_preprocessing import ExprStatNode, TransformLocal, CheckingInOut, Custom_call, Declarations,Assignment, Member_access, Binary_op, Index, Local
""" Read BioMA component and extract metadata
"""
type_={
"DOUBLE":"double",
"INT":"int",
"STRING":"string",
"DATE":"date",
"DOUBLELIST":"List",
"INTLIST":"List",
"STRINGLIST":"List",
"DATELIST":"List",
"BOOLEAN":"bool",
"DOUBLEARRAY":"array",
"INTARRAY":"array"}
pseudo_type_={
"DOUBLE":"double",
"INT":"int",
"STRING":"string",
"DATE":"date",
"DOUBLELIST":["List", "double"],
"INTLIST":["List", "int"],
"STRINGLIST":["List", "string"],
"DATELIST":["List", "date"],
"BOOLEAN":"bool",
"DOUBLEARRAY":["array", "double"],
"INTARRAY":["array", "int"]}
[docs]
def create_package(output):
crop2ml_rep = Path(os.path.join(output, 'crop2ml'))
if not isdir(crop2ml_rep):
crop2ml_rep.mkdir()
algo_rep = Path(os.path.join(crop2ml_rep, 'algo'))
if not isdir(algo_rep):
algo_rep.mkdir()
cyml_rep = Path(os.path.join(algo_rep, 'pyx'))
if not isdir(cyml_rep):
cyml_rep.mkdir()
return crop2ml_rep, cyml_rep
[docs]
def redefine_params(m:Node, var_:Tuple, member_category, inputs, outputs)->List[Node]:
"""It allows to change all parameters which are instance of domain class with the explicit attributes required
Args:
m (Node): Auxiliary function ASG
var_ (Tuple): Metadata from strategy classes and varinfo files: (inputs, parameters, outputs)
member_category (_type_): _description_
inputs (List[str]): Parameter names of the auxiliary function. It can be an instance of domain class
Returns:
List[Node]: New parameters nodes
"""
name = m.name
inputs_p_node = []
outputs_p_node = []
for p in m.params:
if p.pseudo_type not in list(pseudo_type_.values()) :
# find the inputs whose instance is p.name
inputs_p = [key for key, val in member_category[name].items() if val==p.name]
# find its datatype
for p_ in set(inputs):
if p_ in inputs_p:
vn = p_[:-3] if (len(p_)>3 and p_.endswith("_t1")) else p_
for m_inp in var_[0] + var_[2]:
if m_inp["Name"].decode("utf-8") == vn:
inputs_p_node.append(Node(type=type_[m_inp['ValueType']], name=p_, pseudo_type=pseudo_type_[m_inp['ValueType']]))
for p_ in set(outputs):
if p_ in inputs_p:
vn = p_[:-3] if (len(p_)>3 and p_.endswith("_t1")) else p_
for m_inp in var_[0] + var_[2]:
if m_inp["Name"].decode("utf-8") == vn:
outputs_p_node.append(Node(type= "local", name=p_, pseudo_type=pseudo_type_[m_inp['ValueType']]))
else:
inputs_p_node.append(p)
return inputs_p_node, outputs_p_node
[docs]
def inst_dclass(meth):
params = meth.params
lst = []
for p in params:
if isinstance(p.pseudo_type, Node) and "typename" in dir(p.pseudo_type):
pname = str(p.name)
lst.append((p.pseudo_type.pseudo_type, pname))
elif isinstance(p.pseudo_type, str):
lst.append((p.pseudo_type, str(p.name)))
else:
lst.append((p.pseudo_type[1].upper() + p.pseudo_type[0].upper(), str(p.name)))
orDict = defaultdict(list)
# iterating over list of tuples
for key, val in lst:
orDict[key].append(val)
dclassdict = dict(orDict)
return dclassdict
[docs]
def translate(total_tree, varinfo, algo, not_declared, res_inout={}, member_category={}, instance_dclass={}):
"""Transform specific nodes based on class of subnodes of node. It also allows to extract some usefull information. At finish
the modified node contains only the constructs of CyML and converted in CyML after applying translate_simple function.
Args:
total_tree (Node): ASG of all the component
varinfo (Node): ASG of the Var info files
Returns:
ASG: transform algo
"""
z = BiomaExtraction()
funcs = z.externFunction(total_tree, algo)
funcs = [f for f in funcs if f]
rr1 = Member_access(total_tree, varinfo)
vv1 = rr1.process(algo)
ri2 = Index()
vi2= ri2.process(vv1)
rr2 = Binary_op()
vv2= rr2.process(vi2)
rr3 = Assignment()
vv3 = rr3.process(vv2)
rr = Declarations()
vv = rr.process(vv3)
rr4 = Custom_call(total_tree, funcs, not_declared, res_inout, member_category, instance_dclass)
vv4 = rr4.process(vv)
expr = ExprStatNode(funcs, res_inout)
expr_ = expr.process(vv4)
return rr, expr_
[docs]
def translate_(f, pa):
res = []
res_ = []
dr = Declarations()
dv = dr.process(f)
lr = Local(dr.declnames)
lv = lr.process(dv)
params = [str(p.name) for p in f.params]
args = params + dr.declnames
not_declared = list(set(lr.not_declared) - set(args))
for n in not_declared:
for m in pa:
if m["Name"].decode("utf-8") == n:
r = Node(type=type_[m['ValueType']], name=n, pseudo_type=pseudo_type_[m['ValueType']])
res.append(r)
r.type = "local"
res_.append(r)
break
return lv, dr.declarations, res_
[docs]
def run_bioma(component, output):
"""Transform a BioMA component in Crop2ML
Args:
component (_type_): bioma component path
output (_type_): Crop2ML package path
"""
crop2ml_rep, cyml_rep = create_package(output)
pkg = os.path.split(component)[-1].replace('-', '_')
files = repowalk.walk(component, "cs" )
res = {}
stra = {}
straNames = []
varinfo = {}
dclass = []
domclass = []
compo = ""
for k, v in files.items():
#print(v)
with open(v, 'r') as f:
code = f.read()
if code : # and k=="WheatLAIState.cs":
if code.startswith(""): code = code[3:]
splitcode = code.split('\n')
zz = map(lambda x: x.lstrip(), splitcode)
codelist = [n for n in zz if not n.startswith("#") ]
code = "\n".join(codelist)
print(k)
dictasgt = to_dictASG(code,"cs")
strAsg = to_CASG(dictasgt)
res[k] = strAsg
m = BiomaExtraction()
m.getTypeNode(strAsg, "classDef")
g= m.getTree
n = m.getmethod( g, "Estimate")
if g and g[0].base and m.getmethod( g[0], "Estimate"):
if m.getmethod( g[0],"EstimateOfAssociatedClasses"):
compo = strAsg
else:
stra[k] = strAsg
straNames.append(g[0].name)
elif g and g[0].base and g[0].base[0].type=="IVarInfoClass":
varinfo[k] = strAsg
elif g and g[0].base and len(g[0].base) ==2 and g[0].base[1]=="IDomainClass" :
dclass.append(strAsg)
total_tree = list(res.values())
vinfo = list(varinfo.values())
strats = list(stra.values())
models = []
func_names = []
kk = BiomaExtraction()
all_var = kk.getAllVar(vinfo, dclass)
for k, st in enumerate(strats):
print(k, st)
#st_ = deepcopy(st)
z = BiomaExtraction()
description = z.description(st)
var_ = z.getFromVarInfo(st,vinfo, dclass)
past_cur = z.instancePastCurrent(st)
pc = z.prec_cur_states(st, past_cur)
var = z.totalvar(st)
algo = z.getAlgo(st)
funcs = z.externFunction(st, algo.block)
funcs = [f for f in funcs if f]
strat_var = z.getStrategyVar(st)
pa = strat_var[0]
dict_pa = {f["Name"].decode("utf-8"):f for f in pa}
all_var_pa = {**dict_pa, **all_var} # all the variable from all varinfo files and parameters of the specific strategy.
params_not_declared = {}
params_not_declared_ = {}
decl = {}
member_category = {} # member_category[func1] = {member1:instance1, member2: instance1, ...}
res_inout = {}
instance_dclass = {}
if funcs:
for f in funcs:
res = []
res_ = []
f_rr1 = Member_access(total_tree, inst_dclass(f))
f_vv1 = f_rr1.process(f)
member_category[f.name] = f_rr1.m_cat
ri2 = Index()
vi2= ri2.process(f_vv1)
dr = Declarations()
dv = dr.process(vi2)
lr = Local(dr.declnames)
lv = lr.process(dv)
params = [str(p.name) for p in lv.params]
args = params + dr.declnames
not_declared = list(set(lr.not_declared) - set(args))
for n in not_declared:
for m in pa:
if m["Name"].decode("utf-8") == n:
r = Node(type=type_[m['ValueType']], name=n, pseudo_type=pseudo_type_[m['ValueType']])
res.append(r)
r.type = "local"
res_.append(r)
break
params_not_declared[lv.name] = res
params_not_declared_[lv.name] = res_
decl[lv.name] = dr.declarations
name = lv.name
instance_dclass[name] = inst_dclass(lv) # before changing the signature
res_inout[name] = {"inputs":None, "outputs":None}
#z.model.function.append(name)
lv.params = lv.params + params_not_declared[name]
lv.block.insert(0,decl[name])
trans_local = TransformLocal(params_not_declared[name])
r_trans_local = trans_local.process(lv)
rr2 = Binary_op()
vv2= rr2.process(r_trans_local)
cust = Custom_call(total_tree, not_declared=params_not_declared[name])
p_cust = cust.process(vv2)
env = {xx.name:xx.pseudo_type for j in decl[name] for xx in j.decl}
zz = CheckingInOut(env)
r_ch = zz.process(p_cust)
inputs_p_node, outputs_p_node = redefine_params(p_cust,var_, member_category,zz.inputs, zz.outputs)
p_cust.params = inputs_p_node
res_inout[name]["inputs"] = inputs_p_node
#print("************", name, member_category)
if p_cust.return_type == "Void":
if len(outputs_p_node) == 1:
p_cust.return_type = outputs_p_node[0].pseudo_type
return_ = Node(type = "implicit_return", value = Node(type = "local", pseudo_type = p_cust.return_type, name = outputs_p_node[0].name))
else:
p_cust.return_type = ["Tuple"] + [n.pseudo_type for n in outputs_p_node]
return_ = Node(type = "implicit_return", value = Node(type = "Tuple", pseudo_type = p_cust.return_type, elements = outputs_p_node))
p_cust.block.append(return_)
res_inout[name]["outputs"] = return_.value
r = [p_cust]
while True:
func = f
f = f if isinstance(f, list) else [f]
exts = [z.externFunction(st, n, False, n.name) for n in f][0]
exs = [i for i in exts if i ]
if exs:
for ex in exs:
if ex.name not in func_names: # to avoid duplicating dependant functions in different auxiliary functions
res = []
res_ = []
res_inout[ex.name] = {"inputs":None, "outputs":None}
dclassdict = inst_dclass(ex)
f_rr1 = Member_access(total_tree, dclassdict)
f_vv1 = f_rr1.process(ex)
ri2 = Index()
vi2= ri2.process(f_vv1)
dr = Declarations()
dv = dr.process(vi2)
lr = Local(dr.declnames)
lv = lr.process(dv)
params = [str(p.name) for p in lv.params]
args = params + dr.declnames
not_declared = list(set(lr.not_declared) - set(args))
for n in not_declared:
for mm in pa:
if mm["Name"].decode("utf-8") == n:
tt = Node(type=type_[mm['ValueType']], name=n, pseudo_type=pseudo_type_[mm['ValueType']])
res.append(tt)
tt.type = "local"
res_.append(tt)
break
params_not_declared[lv.name] = res
params_not_declared_[lv.name] = res_
trans_local = TransformLocal(params_not_declared[lv.name])
r_trans_local = trans_local.process(lv)
rr2 = Binary_op()
vv2= rr2.process(r_trans_local)
cust = Custom_call(total_tree, not_declared=params_not_declared[lv.name])
p_cust = cust.process(vv2)
env = {xx.name:xx.pseudo_type for j in dr.declarations for xx in j.decl}
zz = CheckingInOut(env)
r_ch = zz.process(p_cust)
member_category[p_cust.name] = f_rr1.m_cat
inputs_p_node, outputs_p_node = redefine_params(p_cust,var_, member_category,zz.inputs, zz.outputs)
p_cust.params = inputs_p_node
res_inout[p_cust.name]["inputs"] = inputs_p_node
if p_cust.return_type == "Void":
if len(outputs_p_node) == 1:
p_cust.return_type = outputs_p_node[0].pseudo_type
return_ = Node(type = "implicit_return", value = Node(type = "local", pseudo_type = p_cust.return_type, name = outputs_p_node[0].name))
else:
lv.return_type = ["Tuple"] + [n.pseudo_type for n in outputs_p_node]
return_ = Node(type = "implicit_return", value = Node(type = "Tuple", pseudo_type = p_cust.return_type, elements = outputs_p_node))
p_cust.block.append(return_)
res_inout[p_cust.name]["outputs"] = return_.value
p_cust.params = p_cust.params + res
p_cust.block.insert(0,dr.declarations)
r.append(p_cust)
func_names.append(p_cust.name) # to avoid duplicating dependant functions in different auxiliary functions
f = exs
else:
break
#print(z.dclassdict)
#rr1 = Member_access(total_tree, z.dclassdict)
#vv1 = rr1.process(r)
cd = cs_cyml.Cs_Cyml_ast(r)
h = cd.transform()
nd = transform_to_syntax_tree(h)
code = writeCyml(nd)
filename = Path(os.path.join(cyml_rep, "%s.pyx"%(name)))
with open(filename, "wb") as tg_file:
tg_file.write(code.encode('utf-8'))
rr, vv = translate(total_tree, z.dclassdict, algo.block, params_not_declared_, res_inout, member_category, instance_dclass)
env = {m.name:m.pseudo_type for j in rr.declarations for m in j.decl}
zz = CheckingInOut(env)
r_ch = zz.process(vv)
z.modelunit(description, var_, all_var_pa,var, list(set(zz.inputs)), list(set(zz.outputs)))
#print(z.model.name, list(set(mod_inputs)))
z.model.function = [n.name for n in funcs if f]
models.append(z.model)
#print(z.model.name,set(zz.inputs) )
cd = cs_cyml.Cs_Cyml_ast(rr.declarations + vv, var =var)
h = cd.transform()
nd = transform_to_syntax_tree(h)
code = writeCyml(nd)
filename = Path(os.path.join(cyml_rep, "%s.pyx"%(straNames[k])))
with open(filename, "wb") as tg_file:
tg_file.write(code.encode('utf-8'))
xml_ = Pl2Crop2ml(z.model, "Crop2ML."+pkg).run_unit()
filename = Path(os.path.join(crop2ml_rep, "unit.%s.xml"%(straNames[k])))
with open(filename, "wb") as xml_file:
#xml_file.write(xml_.unicode(indent=4).encode('utf-8'))
r = '<?xml version="1.0" encoding="UTF-8"?>\n'
r += '<!DOCTYPE ModelUnit PUBLIC " " "https://raw.githubusercontent.com/AgriculturalModelExchangeInitiative/crop2ml/master/ModelUnit.dtd">\n'
r += xml_.unicode(indent=4)#.encode('utf-8')
xml_file.write(r.encode())
z.modelcomposition(models,compo)
xml_ = Pl2Crop2ml(z.mc, "Crop2ML."+pkg).run_compo()
name = z.mc.name[:-9] if z.mc.name.endswith("Component") else z.mc.name
filename = Path(os.path.join(crop2ml_rep, "composition.%s.xml"%(name)))
with open(filename, "wb") as xml_file:
#xml_file.write(xml_.unicode(indent=4).encode('utf-8'))
r = '<?xml version="1.0" encoding="UTF-8"?>\n'
r += '<!DOCTYPE ModelComposition PUBLIC " " "https://raw.githubusercontent.com/AgriculturalModelExchangeInitiative/crop2ml/master/ModelComposition.dtd">\n'
r += xml_.unicode(indent=4)#.encode('utf-8')
xml_file.write(r.encode())