Source code for pycropml.transpiler.antlr_py.csharp.run

# coding: utf-8

from __future__ import absolute_import
from __future__ import print_function
import os
from os.path import isdir
from copy import deepcopy
from typing import *
from path import Path

import networkx as nx
import itertools

from pycropml.transpiler.antlr_py.to_CASG import to_dictASG, to_CASG
from pycropml.transpiler.antlr_py.csharp.csharpExtraction import CsharpExtraction
from pycropml.transpiler.pseudo_tree import Node
from pycropml.transpiler.antlr_py.csharp import cs_cyml
from pycropml.transpiler.generators.cymlGenerator import CymlGenerator
from pycropml.transpiler.ast_transform import transform_to_syntax_tree
from pycropml.transpiler.antlr_py.generateCyml import writeCyml
from pycropml.transpiler.antlr_py.createXml import Pl2Crop2ml
from pycropml.transpiler.antlr_py import repowalk

from pycropml.transpiler.antlr_py.csharp.csharp_preprocessing import *
from pycropml.transpiler.antlr_py.codeExtraction import extraction
from pycropml.transpiler.antlr_py.extract_metadata_from_comment import extract
from pycropml.transpiler.antlr_py.api_declarations import Middleware
from pycropml.transpiler.antlr_py.to_specification import extractMetaInfo, createObjectModel, extractcomments, createObjectCompo

from copy import deepcopy, copy

description_tags = ["//%%CyML Description Begin%%", "//%%CyML Description End%%"]


""" Read BioMA component and extract metadata

"""

type_={
    "DOUBLE":"double",
    "INT":"int",
    "STRING":"string",
    "DATE":"date",
    "DOUBLELIST":"List",
    "INTLIST":"List",
     "STRINGLIST":"List",
     "DATELIST":"List",
    "BOOLEAN":"bool",
    "DOUBLEARRAY":"array",
    "INTARRAY":"array",
    "STRINGARRAY":"array"}

pseudo_type_={
    "DOUBLE":"double",
    "INT":"int",
    "STRING":"string",
    "DATE":"date",
    "DOUBLELIST":["List", "double"],
    "INTLIST":["List", "int"],
    "STRINGLIST":["List", "string"],
    "DATELIST":["List", "date"],
    "BOOLEAN":"bool",
    "DOUBLEARRAY":["array", "double"],
    "INTARRAY":["array", "int"],
    "STRINGARRAY":["array", "string"]}
    
    
[docs] class Custom_call2(Middleware): def __init__(self, vars, extfunc = [], not_declared={}, ext_func_inout={}, member_category={}): self.vars = vars self.extfunc = extfunc self.not_declared = not_declared self.ext_func_inout = ext_func_inout self.member_category = member_category Middleware.__init__(self)
[docs] def process(self, tree): return self.transform(tree,in_block=False)
[docs] def action_custom_call(self, tree): method = tree.function if "namespace" in dir(tree): namespace = tree.namespace name = namespace.name args = tree.args receiver_type = namespace.pseudo_type rec = {"type":"local", "name":name, "pseudo_type":receiver_type} receiver_type = receiver_type[0] if isinstance(receiver_type, list) else receiver_type if receiver_type in METHOD_API: api = METHOD_API.get(receiver_type,{}).get(method) if api: tree = api.expand([rec]+ args) if "message" in tree and tree["message"] not in ("contains?", "index", "copyto"): tree = transform_to_syntax_tree({"type": 'ExprStatNode', 'expr': tree}) else: return tree if not api: raise translation_error('CyMLT doesn\' t support %s %s ' % (receiver_type, method), suggestions='CyMLT supports those %s functions\n %s' % ( name, prepare_table(TYPED_API[receiver_type], ORIGINAL_METHODS.get(receiver_type)).strip())) if self.extfunc: for f in self.extfunc: if f.name == method: trees = copy(tree) args = [] z = CsharpExtraction() inps = self.ext_func_inout[f.name]["inputs"] if not isinstance(inps, list): inps = [inps] meth_member_category = self.member_category[f.name] if meth_member_category: for num, arg in enumerate(tree.args): if isinstance(arg.pseudo_type, Node) : restricted_inputs = [ins for ins in inps if ("position_args" in dir(ins) and ins.position_args == num+1)] for k in restricted_inputs: k.type = "local" args.append(k) else: args.append(arg) if args: tree.args = args if len(trees.args) != len(inps): trees.args.extend(inps[len(trees.args):]) for num1, ar1 in enumerate(trees.args): for num2, ar2 in enumerate(inps): if num1 == num2: if ((ar1.type == "int" ) or (ar1.type=="local" and ar1.pseudo_type=="int")) and ar2.pseudo_type in ["float", "double"] : trees.args[num1] = Node(type="standard_method_call", receiver=trees.args[num1], args=[], message="float", pseudo_type="float") outs = deepcopy(self.ext_func_inout[f.name]["outputs"]) if outs: if outs.type=="local" and "pos" in dir(outs): reso = copy(outs) reso.name = trees.args[outs.pos].name res = transform_to_syntax_tree({"type": 'assignment', "target":reso,'op':"=", 'value': trees}) return res else: if outs.type == "Tuple": reso = deepcopy(outs) for j, o in enumerate(reso.elements): if "pos" in dir(o): reso.elements[j].name = copy(trees.args)[o.pos].name return transform_to_syntax_tree({"type": 'assignment', "target":reso,'op':"=", 'value': trees}) else: return transform_to_syntax_tree({"type": 'assignment', "target":outs,'op':"=", 'value': trees}) z.getTypeNode(f, "implicit_return") if not z.getTree: print("TODODODODODODO") return transform_to_syntax_tree({"type": 'ExprStatNode', 'expr': trees}) return self.transform_default(tree)
[docs] class CheckingInOut2(Middleware): """_summary_ This code defines a middleware class called "CheckingInOut" that checks the inputs and outputs of a given code block. It does this by keeping track of the current scope and environment, and adding any new variables to the environment. It also keeps track of the inputs and outputs of the code block by checking if a variable is already in the current scope or not. The middleware class has methods for different types of statements and expressions, such as assignment, if statements, for loops, and function calls. The purpose of this middleware is to ensure that the inputs and outputs of a code block are well-defined and can be used by other parts of a program. """ def __init__(self, env_init=None, isAlgo=False): if env_init is None: self.env_init = {} self.env_init = env_init self.env = [self.env_init] self.current_scope = {} self.if_scope = None self.isAlgo = isAlgo self.inputs = [] self.outputs = [] self.newdecl = [] Middleware.__init__(self)
[docs] def process(self, tree): self.current_scope = self.current() #self.name = tree.name return self.transform(tree,in_block=False)
[docs] def current(self): return {k: v for d in self.env for k, v in d.items()}
[docs] def workflow(self, tree): self.env.append({}) r = self.transform_default(tree) self.env.pop() self.current_scope = self.current() return r
[docs] def action_assignment(self, tree): self.transform(tree.value) if tree.target.type == "local": t_name = tree.target.name type_ = tree.target.pseudo_type if t_name not in self.current_scope and not self.isAlgo: # self.env[-1] #self.inputs.append(t_name) self.newdecl.append({t_name:type_}) self.env[-1][t_name] = type_ self.outputs.append(t_name) elif "sequence" in dir(tree.target): t_name = tree.target.sequence.name type_ = tree.target.sequence.pseudo_type if t_name not in self.current_scope and not(isinstance(type_, list) and type_[0]=="array"): self.inputs.append(t_name) self.env[-1][t_name] = type_ self.outputs.append(t_name) elif tree.target.type == "sliceindex": t_name = tree.target.receiver.name type_ = tree.target.receiver.pseudo_type if t_name not in self.current_scope: self.inputs.append(t_name) self.env[-1][t_name] = type_ self.outputs.append(t_name) else: for elem in tree.target.elements: t_name = elem.name if "name" in dir(elem) else elem.sequence.name type_ = elem.pseudo_type self.outputs.append(t_name) self.env[-1][t_name] = type_ if "op" in dir(tree) and tree.op != "=": if t_name not in self.current_scope: self.inputs.append(t_name) self.current_scope = self.current() return tree
'''def action_member_access(self, tree): newvar = tree.name + "_" + tree.member tree = Node(type="local", name=newvar, pseudo_type=tree.pseudo_type) if tree.name not in self.current_scope:# and tree.name not in self.outputs: self.inputs.append(tree.name) self.env[-1][tree.name] = tree.type self.current_scope = self.current() return tree'''
[docs] def action_custom_call(self, tree): if "namespace" in dir(tree): self.transform(tree.namespace) for arg in tree.args: self.transform(arg) return tree
[docs] def action_standard_method_call(self, tree): if isinstance(tree.receiver.pseudo_type, list) and tree.message in ["append", "sum", "remove", "extend", "insert"]: t_name = tree.receiver.name if t_name not in self.current_scope: self.inputs.append(t_name) self.env[-1][t_name] = tree.receiver.pseudo_type self.outputs.append(t_name) self.current_scope = self.current() self.transform_default(tree) return tree
[docs] def action_declaration(self, tree): for d in tree.decl: if "value" in dir(d): self.transform_default(d.value) self.env[-1][d.name] = d.type self.current_scope = self.current() return tree
[docs] def action_local(self, tree): if tree.name not in self.current_scope:# and tree.name not in self.outputs: self.inputs.append(tree.name) self.env[-1][tree.name] = tree.type self.current_scope = self.current() return tree
[docs] def action_if_statement(self, tree): self.env.append({}) self.transform(tree.test) self.transform(tree.block) m1 = self.env[-1] self.env.pop() self.current_scope = self.current() if tree.otherwise: self.env.append({}) self.transform(tree.otherwise) m2 = self.env[-1] self.env.pop() common_keys = m1.keys( ) & m2.keys() common_dict = {k:m1[k] for k in common_keys} self.env[-1].update(common_dict) self.current_scope = self.current() return tree
[docs] def action_implicit_return(self, tree): self.transform(tree.value) return tree
[docs] def action_binary_op(self, tree): self.transform(tree.left) self.transform(tree.right) return tree
[docs] def action_for_iterator(self, tree): iterator = tree.iterator.name if iterator not in self.current_scope: self.env[-1][iterator] = tree.iterator.pseudo_type self.current_scope = self.current() return tree
[docs] def action_for_statement(self, tree): return self.workflow(tree)
[docs] def action_for_range_statement(self, tree): return self.workflow(tree)
[docs] def action_while_statement(self, tree): return self.workflow(tree)
[docs] def action_standard_call(self, tree): for arg in tree.args: self.transform(arg) return tree
[docs] class Local2(Middleware): def __init__(self, declnames=[], params={}): self.declnames = declnames self.not_declared=[] self.params = params Middleware.__init__(self)
[docs] def process(self, tree): return self.transform(tree,in_block=False)
[docs] def action_local(self, tree): if tree.name not in self.declnames: self.not_declared.append(str(tree.name)) if tree.name in self.params: pseudo_type = self.params[tree.name] tree.pseudo_type = pseudo_type return tree
[docs] class Member_access2(Middleware): def __init__(self, totaltree, prec_cur=[]): self.totaltree = totaltree self.members = [] self.prec_cur = prec_cur self.m_cat = {} Middleware.__init__(self)
[docs] def process(self, tree): return self.transform(tree,in_block=False)
[docs] def test_action_member_access(self, tree): if "." in tree.member and tree.name + "_" + tree.member.split(".")[0] in self.all_var : propert = tree.member.split(".")[-1] name = tree.name + "_" + tree.member.split(".")[0] type_ = pseudo_type[self.all_var[name]] if isinstance(type_, list): type_ = type_[0] pseudo_type = self.all_var[name] if type_ in PROPERTY_API: rec = {"type":"local", "name": name, "pseudo_type":pseudo_type} api = PROPERTY_API[type_].get(propert) if not api: raise PseudoCythonTypeCheckError("Not implemented property , %s"%propert) elif not isinstance(api, dict): z = api.expand([rec]) return transform_to_syntax_tree(z) member = tree.member.replace(".", "_") name = tree.name + "_" + member pseudo = tree.pseudo_type tree.name = name self.members.append(tree) # retrieve the class name # retrieve the corresponding node in the totaltree # dtype: retrieve the datatype wwith the variable name # return {"type":"local", "name":name, "pseudo_type":dtype} if pseudo is None: return Node(type="local", name= name) else: return Node(type = "local", name =name, pseudo_type = pseudo_type_[self.all_var[name]])
[docs] def action_member_access(self, tree): name = tree.member if "." in name: name = name.split('.')[0] print("member accessssssssssssssssssssssss", tree.y) pseudo = tree.pseudo_type self.members.append(tree) # retrieve the class name # retrieve the corresponding node in the totaltree # dtype: retrieve the datatype wwith the variable name # return {"type":"local", "name":name, "pseudo_type":dtype} if pseudo is None: return Node(type="local", name= tree.member.split(".")[-1]) classname = pseudo.pseudo_type if isinstance(pseudo, Node) else pseudo z = BiomaExtraction() z.getTypeNode(self.totaltree, "classDef") classNode = [m for m in z.getTree if m.name == classname] z.getTypeNode(classNode[0], "propertyDef") propNode = [m for m in z.getTree if m.name == name] if not propNode: return Node(type = "local", name =name, pseudo_type = pseudo) pseudo_prop = propNode[0].pseudo_type if isinstance(pseudo_prop, Node): pseudo_type = pseudo_prop.pseudo_type else: pseudo_type = pseudo_prop if isinstance(pseudo_type, list): type_ = pseudo_type[0] else: type_ = pseudo_type if "." in tree.member: v = tree.member.split('.') propert = v[1] name = from_attr_to_var(tree.name, v[0], self.prec_cur) self.m_cat[name] = tree.name if type_ in PROPERTY_API: rec = {"type":"local", "name": name, "pseudo_type":pseudo_type} api = PROPERTY_API[type_].get(propert) if not api: raise PseudoCythonTypeCheckError("Not implemented property , %s"%propert) elif not isinstance(api, dict): z = api.expand([rec]) return transform_to_syntax_tree(z) else: if self.prec_cur: name = from_attr_to_var(tree.name, tree.member, self.prec_cur) self.m_cat[name] = tree.name else: name = tree.member return Node(type = "local", name =name, pseudo_type = pseudo_type) return tree
[docs] class For_statement2(Middleware): def __init__(self): Middleware.__init__(self) self.declared = []
[docs] def process(self, tree): return self.transform(tree,in_block=False)
[docs] def action_for_statement(self, tree): """_summary_ {'type': 'for_sequence', 'sequence': {'type': 'local', 'name': 'soilConstituentNames', 'pseudo_type': ['array', 'string']}}, 'iterators': {'type': 'for_iterator', 'iterator': {'type': 'local', 'name': 'constituentName', 'pseudo_type': 'var'}} """ if tree.sequences.sequence.type == "ExprStatNode": if tree.sequences.sequence.expr.type == "standard_method_call" and tree.sequences.sequence.expr.message == "except": tree = Node(type="for_statement", sequences=Node(type='for_sequence', sequence = tree.sequences.sequence.expr.receiver), iterators=tree.iterators, block=Node(type ='if_statement', test = Node(type ='standard_method_call', receiver = Node(type = "List", pseudo_type = ["List",tree.sequences.sequence.expr.args[0].pseudo_type[-1]], elements = [r.init.value[0] for r in tree.sequences.sequence.expr.args]), message ='not contains?', args =[tree.iterators.iterator], pseudo_type = 'Boolean'), block = tree.block, otherwise = [])) iterator_pseudo_type = tree.iterators.iterator.pseudo_type if iterator_pseudo_type == "var": tree.iterators.iterator.pseudo_type = tree.sequences.sequence.pseudo_type[-1] self.declared.append(tree.iterators.iterator.name) return tree
[docs] def create_package(output): crop2ml_rep = Path(os.path.join(output, 'crop2ml')) if not isdir(crop2ml_rep): crop2ml_rep.mkdir() algo_rep = Path(os.path.join(crop2ml_rep, 'algo')) if not isdir(algo_rep): algo_rep.mkdir() cyml_rep = Path(os.path.join(algo_rep, 'pyx')) if not isdir(cyml_rep): cyml_rep.mkdir() return crop2ml_rep, cyml_rep
[docs] def function_dependency(st, f): r = [f] z = CsharpExtraction() while True: f = f if isinstance(f, list) else [f] f_exts = [z.externFunction(st, n, False, n.name) for n in f] exts = list(itertools.chain(*f_exts)) exs = [i for i in exts if i ] if exs: for ex in exs: r.append(ex) f = exs else: break return r
[docs] def redefine_params(m:Node, var_:Dict, member_category, inputs, outputs, extfunc, instance_dclass)->List[Node]: """It allows to change all parameters which are instance of domain class with the explicit attributes required Args: m (Node): Auxiliary function ASG var_ (dict): Metadata from strategy classes and varinfo files: (inputs, parameters, outputs) member_category (_type_): _description_ inputs (List[str]): Parameter names of the auxiliary function. It can be an instance of domain class Returns: List[Node]: New parameters nodes """ res = [] name = m.name inputs_p_node = [] outputs_p_node = [] inps = list(set(inputs)) pseudo_name = [] pos = 0 reso = [] inpnames = {p.name:p.pseudo_type for p in m.params} for p in m.params: pos = pos + 1 if p.pseudo_type not in list(pseudo_type_.values()) : pseudo = p.pseudo_type.pseudo_type if isinstance(p.pseudo_type, Node) else p.pseudo_type pseudo_name.append(pseudo) # find the inputs whose instance is p.name inputs_p = [key for key, val in member_category[name].items() if val==p.name] #inputs_p = inps # find its datatype for p_ in inps: if p_ in inputs_p: vn = p_[:-3] if (len(p_)>3 and p_.endswith("_t1")) else p_ for k, m_inp in var_.items(): if k == vn or (k == p_): inputs_p_node.append(Node(type=type_[m_inp['ValueType']], name=p_, pseudo_type=pseudo_type_[m_inp['ValueType']], position_args = pos)) res.append(name) break else: indice = 1 if pseudo_name.count(pseudo)>1 else 0 for f in extfunc: test = False dclass = deepcopy(instance_dclass[f.name]) meth_member_category = member_category[f.name] if meth_member_category: var = dclass[pseudo][indice] inputs_names = [key for key, val in meth_member_category.items() if val==var] if p_ in inputs_names: vn = p_[:-3] if (len(p_)>3 and p_.endswith("_t1")) else p_ for k, m_inp in var_.items(): if k == vn or (k == p_): test = True res.append(name) inputs_p_node.append(Node(type=type_[m_inp.datatype], name=p_, pseudo_type=pseudo_type_[m_inp.datatype], position_args = pos)) break if test: break else: if p.name not in res and p.name in inputs: inputs_p_node.append(p) res.append(p.name) if isinstance(p.pseudo_type, list) and p.pseudo_type[0] in ("array", "List", "list") and p.name in outputs and p.name not in reso: outputs_p_node.append(Node(type= "local", name=p.name, pseudo_type=p.pseudo_type)) reso.append(p.name) # This case because in C# an input with datatype list or array is passed by reference outs = list(set(outputs)) for p_ in outs: vn = p_[:-3] if (len(p_)>3 and p_.endswith("_t1")) else p_ for k, m_inp in var_.items(): if (k == vn or (k == p_)) and p_ not in reso: outputs_p_node.append(Node(type= "local", name=p_, pseudo_type=pseudo_type_[m_inp.datatype])) reso.append(p_) break if p_ not in reso and p_ in inpnames: outputs_p_node.append(Node(type= "local", name=p_, pseudo_type=inpnames[p_])) reso.append(p_) return inputs_p_node, outputs_p_node
from collections import defaultdict
[docs] def inst_dclass(meth): params = meth.params lst = [] for p in params: if isinstance(p.pseudo_type, Node) and "typename" in dir(p.pseudo_type): pname = str(p.name) lst.append((p.pseudo_type.pseudo_type, pname)) elif isinstance(p.pseudo_type, str): lst.append((p.pseudo_type, str(p.name))) else: lst.append((p.pseudo_type[1].upper() + p.pseudo_type[0].upper(), str(p.name))) orDict = defaultdict(list) # iterating over list of tuples for key, val in lst: orDict[key].append(val) dclassdict = dict(orDict) return dclassdict
[docs] def translate(total_tree, varinfo, algo, not_declared, res_inout={}, member_category={}, pa={}): """Transform specific nodes based on class of subnodes of node. It also allows to extract some usefull information. At finish the modified node contains only the constructs of CyML and converted in CyML after applying translate_simple function. Args: total_tree (Node): ASG of all the component varinfo (Node): ASG of the Var info files Returns: ASG: transform algo """ z = CsharpExtraction() funcs = z.externFunction(total_tree, algo) funcs = [f for f in funcs if f] rr1 = Member_access2(total_tree, varinfo) vv1 = rr1.process(algo) rr1_ = Local2(params=pa) vv1_ = rr1_.process(vv1) ri2 = Index() vi2= ri2.process(vv1_) rr2 = Binary_op() vv2= rr2.process(vi2) rr3 = Assignment() vv3 = rr3.process(vv2) rr = Declarations() vv = rr.process(vv3) rr4 = Custom_call2(total_tree, funcs, not_declared, res_inout, member_category) vv4 = rr4.process(vv) expr = ExprStatNode(funcs, res_inout) expr_ = expr.process(vv4) return rr, expr_
[docs] def translate_(f, pa): res = [] res_ = [] dr = Declarations() dv = dr.process(f) lr = Local(dr.declnames) lv = lr.process(dv) params = [str(p.name) for p in f.params] args = params + dr.declnames not_declared = list(set(lr.not_declared) - set(args)) for n in not_declared: for m in pa: if m["Name"].decode("utf-8") == n: r = Node(type=type_[m['ValueType']], name=n, pseudo_type=pseudo_type_[m['ValueType']]) res.append(r) r.type = "local" res_.append(r) break return lv, dr.declarations, res_
[docs] def run_csharp(component, output): """Transform a CSharp component in Crop2ML Args: component (_type_): csharp component path output (_type_): Crop2ML package path """ crop2ml_rep, cyml_rep = create_package(output) pkg = os.path.split(component)[-1].replace('-', '_') files = repowalk.walk(component, "cs" ) res = {} stra = {} straNames = [] varinfo = {} dclass = [] compo = {} source_codes=[] compo_codes = [] for k, v in files.items(): with open(v, 'r') as f: code = f.read() if code : if code.startswith(""): code = code[3:] splitcode = code.split('\n') zz = map(lambda x: x.lstrip(), splitcode) codelist = [n for n in zz if not n.startswith("#") ] code = "\n".join(codelist) dictasgt = to_dictASG(code,"cs") strAsg = to_CASG(dictasgt) res[k] = strAsg print("Processing file:", v) print("===================================" ) m = CsharpExtraction() m.getTypeNode(strAsg, "classDef") g= m.getTree n = m.getmethod( g, "CalculateModel") if n: if "Component" in v.split(os.sep)[-1]: compo[k] = strAsg compo_codes.append(code) else: stra[k]=strAsg straNames.append(g[0].name) source_codes.append(code) total_tree = list(res.values()) #vinfo = list(varinfo.values()) strats = list(stra.values()) compos = list(compo.values()) models = [] func_names = [] kk = CsharpExtraction() all_var = kk.getAllVar(source_codes) for k, st in enumerate(strats): print(k, st) mod = source_codes[k] z = CsharpExtraction(code=mod) var = z.totalvar(st) algo = z.getAlgo(st) init_ = z.getInit(st) funcs = z.externFunction(st, algo.block + init_.block, False) if init_ else z.externFunction(st, algo.block, False) funcs = [f for f in funcs if f] commentsPart = extraction(mod, description_tags[0], description_tags[1]) mdata = extract(commentsPart[0]+"\n\n") strat_var = z.getStrategyVar() pa = strat_var[0] dict_pa = {f.name:f for f in pa} all_var_pa = {**dict_pa, **all_var} # all the variable from all varinfo files and parameters of the specific strategy. #pa = mdata.inputs params_not_declared = {} params_not_declared_ = {} decl = {} member_category = {} # member_category[func1] = {member1:instance1, member2: instance1, ...} res_inout = {} instance_dclass = {} if funcs: for f in funcs: r = [] # order of function dependency f_dep = function_dependency(st, f) dep_ = list(reversed(f_dep)) dep = [] dep_names = [] for d in dep_: if d.name not in dep_names: dep.append(d) dep_names.append(d.name) for ex in dep: # dep is the list of external function in the order of dependency if ex.name not in func_names: # to avoid duplicating dependent functions in different auxiliary functions func = z.externFunction(total_tree, ex, False, ex.name) extfunc = [p for p in func if p] if extfunc and isinstance(extfunc[0], list): extfunc = list(itertools.chain(*extfunc)) for rr in extfunc: if ex.class_!= rr.class_: rr.name = "_" + rr.class_ + "__" + rr.name +"_" params_not_declared[rr.name] = [] res = [] res_ = [] res_inout[ex.name] = {"inputs":None, "outputs":None} dclassdict = inst_dclass(ex) f_rr1 = Member_access2(total_tree, dclassdict) f_vv1 = f_rr1.process(ex) member_category[ex.name] = f_rr1.m_cat ri2 = Index() vi2= ri2.process(f_vv1) dr = Declarations() dv = dr.process(vi2) lr = Local2(dr.declnames) lv = lr.process(dv) params = [str(p.name) for p in lv.params] args = params + dr.declnames not_declared = list(set(lr.not_declared) - set(args)) for n in not_declared: for m in pa: if m.name.encode("utf-8") == n: tt = Node(type=type_[m.datatype], name=n, pseudo_type=pseudo_type_[m.datatype]) res.append(tt) tt.type = "local" res_.append(tt) break params_names_not_declared = [p.name for p in res] for fc in extfunc: params_t = params_not_declared[fc.name] for p in params_t: if p.name not in params_names_not_declared: res.append(p) res_.append(p) params_not_declared[lv.name] = res params_not_declared_[lv.name] = res_ decl[lv.name] = dr.declarations name = lv.name instance_dclass[name] = inst_dclass(lv) # before changing the signature lv.params = lv.params + params_not_declared[name] lv.block.insert(0,decl[name]) trans_local = TransformLocal(params_not_declared[name]) r_trans_local = trans_local.process(lv) rr2 = Binary_op() vv2= rr2.process(r_trans_local) rr4 = Custom_call2(total_tree, extfunc, params_not_declared_, res_inout, member_category) p_cust = rr4.process(vv2) env = {xx.name:xx.pseudo_type for j in decl[name] for xx in j.decl} zz = CheckingInOut2(env) r_ch = zz.process(p_cust) inputs_p_node, outputs_p_node = redefine_params(p_cust,all_var_pa, member_category,zz.inputs, zz.outputs,extfunc, instance_dclass) p_cust.params = inputs_p_node res_inout[name]["inputs"] = inputs_p_node params = {p.name:p.pseudo_type for p in p_cust.params} lr = Local2(declnames=[], params=params) lv = lr.process(p_cust) outputs_node = outputs_p_node # transform_io(zz.outputs, all_var, True) newinps = {p.name:i for i,p in enumerate(inputs_p_node)} if lv.return_type == "Void": if len(outputs_node) == 1: lv.return_type = outputs_node[0].pseudo_type return_ = Node(type = "implicit_return", value = Node(type = "local", pseudo_type = p_cust.return_type, name = outputs_node[0].name)) if outputs_node[0].name in newinps.keys(): return_ = Node(type = "implicit_return", value = Node(type = "local", pseudo_type = p_cust.return_type, name = outputs_node[0].name, pos = newinps[outputs_node[0].name])) else: lv.return_type = ["Tuple"] + [n.pseudo_type for n in outputs_node] return_ = Node(type = "implicit_return", value = Node(type = "Tuple", pseudo_type = p_cust.return_type, elements = outputs_node)) lv.block.append(return_) res_inout[name]["outputs"] = return_.value else: if "modifiers" in dir(lv.block[-1]): return_ = lv.block[-1].value if return_.type == "local" and return_.name in newinps.keys(): elts = [] elts.append(Node(type = "local", pseudo_type = lv.return_type, name = return_.name, pos = newinps[return_.name])) for o in outputs_node: if o.name in newinps.keys() and o.name != return_.name and o.name in all_var and o.name not in dr.declnames: elts.append(Node(type = "local", pseudo_type = o.pseudo_type, name = o.name, pos = newinps[o.name])) elif o.name != return_.name and o.name in all_var and o.name not in dr.declnames: elts.append(o) if len(elts) == 1: return_ = elts[0] else: return_ = Node(type = "Tuple", pseudo_type = ["Tuple"]+[e.pseudo_type for e in elts], elements = elts) lv.block[-1] = Node(type = "implicit_return", value = return_) else: elts = [] r_outs = [o.name for o in return_.elements] for o in outputs_node: if o.name in r_outs and o.name in newinps.keys(): elts.append(Node(type = "local", pseudo_type = o.pseudo_type, name = o.name, pos = newinps[o.name])) elif o.name in r_outs: elts.append(Node(type = "local", pseudo_type = o.pseudo_type, name = o.name)) elif o.name not in r_outs: elts.append(o) return_ = Node(type = "Tuple", pseudo_type = lv.return_type, elements = elts) lv.block[-1] = Node(type = "implicit_return", value = return_) lv.return_type = ["Tuple"] + [n.pseudo_type for n in outputs_node] res_inout[name]["outputs"] = return_ r.append(lv) func_names.append(lv.name) cd = cs_cyml.Cs_Cyml_ast(r) h = cd.transform() nd = transform_to_syntax_tree(h) code = writeCyml(nd) filename = Path(os.path.join(cyml_rep, "%s.pyx"%(name))) with open(filename, "wb") as tg_file: tg_file.write(code.encode('utf-8')) rr, vv = translate(total_tree, z.dclassdict, algo.block, params_not_declared_, res_inout, member_category, dict_pa) zz = CheckingInOut2( {},isAlgo = True) r_ch = zz.process(vv) cd = cs_cyml.Cs_Cyml_ast(rr.declarations + vv, var =var) h = cd.transform() nd = transform_to_syntax_tree(h) code = writeCyml(nd) filename = Path(os.path.join(cyml_rep, "%s.pyx"%(straNames[k]))) with open(filename, "wb") as tg_file: tg_file.write(code.encode('utf-8')) dict_init = {} inps_init = [] outs_init = [] if init_: rr_, init_pseudo = translate(total_tree, z.dclassdict, init_.block, params_not_declared_, res_inout, member_category, dict_pa) dict_init = {} name_i = "init."+straNames[k] dict_init["name"] = "init" dict_init["filename"] = "algo/pyx/" + name_i + ".pyx" #z.model.initialization = [dict_init] cd = cs_cyml.Cs_Cyml_ast(rr_.declarations + init_pseudo, var =var) h = cd.transform() nd = transform_to_syntax_tree(h) initcode = writeCyml(nd) filename = Path(os.path.join(cyml_rep, "init.%s.pyx"%(straNames[k]))) with open(filename, "wb") as tg_file: tg_file.write(initcode.encode('utf-8')) zz2 = CheckingInOut2( {},isAlgo = True) r_ch = zz2.process(init_pseudo) inps_init = zz2.inputs outs_init = zz2.outputs inps_str = zz.inputs + inps_init outs_str = zz.outputs + outs_init z.modelunit(mdata, strat_var, all_var_pa,var, list(set(inps_str)), list(set(outs_str))) z.model.function = [n.name for n in funcs if f] if dict_init: z.model.initialization = [dict_init] models.append(z.model) xml_ = Pl2Crop2ml(z.model, "Crop2ML."+pkg).run_unit() filename = Path(os.path.join(crop2ml_rep, "unit.%s.xml"%(straNames[k]))) with open(filename, "wb") as xml_file: #xml_file.write(xml_.unicode(indent=4).encode('utf-8')) r = '<?xml version="1.0" encoding="UTF-8"?>\n' r += '<!DOCTYPE ModelUnit PUBLIC " " "https://raw.githubusercontent.com/AgriculturalModelExchangeInitiative/crop2ml/master/ModelUnit.dtd">\n' r += xml_.unicode(indent=4)#.encode('utf-8') xml_file.write(r.encode()) for k, compo in enumerate(compos): mod = compo_codes[k] commentsPart = extraction(mod, description_tags[0], description_tags[1]) mdatac = extract(commentsPart[0]+"\n\n") z.modelcomposition(models,compo, mdatac) xml_ = Pl2Crop2ml(z.mc, "Crop2ML."+pkg).run_compo() name = z.mc.name[:-9] if z.mc.name.endswith("Component") else z.mc.name filename = Path(os.path.join(crop2ml_rep, "composition.%s.xml"%(name))) with open(filename, "wb") as xml_file: #xml_file.write(xml_.unicode(indent=4).encode('utf-8')) r = '<?xml version="1.0" encoding="UTF-8"?>\n' r += '<!DOCTYPE ModelComposition PUBLIC " " "https://raw.githubusercontent.com/AgriculturalModelExchangeInitiative/crop2ml/master/ModelComposition.dtd">\n' r += xml_.unicode(indent=4)#.encode('utf-8') xml_file.write(r.encode())
import io
[docs] def is_filehandle(x): return isinstance(x, io.IOBase) # catches TextIOWrapper, BufferedReader, etc.
[docs] def find_filehandles(obj, path="root", seen=None): if seen is None: seen = set() oid = id(obj) if oid in seen: return seen.add(oid) if is_filehandle(obj): print("FILEHANDLE at", path, "->", repr(obj), "name=", getattr(obj, "name", None)) return # Recurse if hasattr(obj, "__dict__"): for k, v in vars(obj).items(): find_filehandles(v, f"{path}.{k}", seen) elif isinstance(obj, dict): for k, v in obj.items(): find_filehandles(v, f"{path}[{k!r}]", seen) elif isinstance(obj, (list, tuple, set)): for i, v in enumerate(obj): find_filehandles(v, f"{path}[{i}]", seen)