Files
aufbau2csv/idoit_scaleup/category.py
Sven Riwoldt e06c448a46 Idoit-Tests 01
2024-12-15 14:38:48 +01:00

253 lines
9.2 KiB
Python
Executable File

from pprint import pprint
from .base import IDoitApiBase
from copy import deepcopy
import json
class IDoitCategory(IDoitApiBase):
def __init__(self, cfg, obj_type: str):
super().__init__(cfg)
self.obj_type = obj_type
params = {
'category': obj_type
}
r = self.xml_rpc_call('cmdb.category_info', params)
self.fields = {}
for fieldname in r['result'].keys():
default = None
if 'default' in r['result'][fieldname]['ui']:
default = r['result'][fieldname]['ui']['default'] or None
self.fields[fieldname] = {
'data_type': r['result'][fieldname]['data']['type'],
'info_type': r['result'][fieldname]['info']['type'],
'ui_type': r['result'][fieldname]['ui']['type'],
'default': default,
'title': r['result'][fieldname]['info']['title']
}
def is_dialog_plus_field(self, fieldname):
return (self.fields[fieldname]['info_type'] == 'dialog_plus')
def save_category(self, objId, data):
sdata = deepcopy(data)
if "_data" in sdata.keys():
del (sdata["_data"])
params = {
"object": objId,
'category': self.obj_type,
'data': sdata}
if self.debug:
print('--Category Save---')
pprint(params)
print('--Category Save---')
return self.xml_rpc_call('cmdb.category.save', params)
def update_category(self, objId, data):
sdata = deepcopy(data)
if "_data" in sdata.keys():
del (sdata["_data"])
if "id" in sdata.keys():
sdata['category_id'] = sdata['id']
del (sdata["id"])
if sdata['category_id'] is None:
raise Exception('category_id is None')
params = {
"objID": objId,
'category': self.obj_type,
'data': sdata}
if self.debug:
print('--Category Update---')
pprint(params)
print('--Category Update---')
return self.xml_rpc_call('cmdb.category.update', params)
def read_categories(self, objId):
params = {
"objID": objId,
'category': self.obj_type,
}
r = self.xml_rpc_call('cmdb.category.read', params)
rtn = []
for item in r['result']:
rtn.append(self.convert_incomming_category(item))
return rtn
def read_category_by_id(self, objId, id):
for cat in self.read_categories(objId):
if cat['id'] == id:
return cat
# not found
return None
def delete_category(self, objId, id):
params = {
"objID": objId,
'category': self.obj_type,
'cateID': id
}
return self.xml_rpc_call('cmdb.category.delete', params)
def read_category(self, objId):
r = self.read_categories(objId)
if len(r) == 0:
return None
assert (len(r) == 1)
return r[0]
def conv_array_field(self, fieldname, data, ref_field):
if len(data[fieldname]) == 0:
return None
if type(data[fieldname]) is dict:
return int(data[fieldname][ref_field])
if len(data[fieldname]) != 1:
raise Exception('Field "%s" has more than one entry %s' %
(fieldname, json.dumps(data)))
return int(data[fieldname][0][ref_field])
def convert_list(self, list):
rtn = []
for ele in list:
rtn.append(int(ele['id']))
if len(rtn) == 0:
return None
return rtn
def convert_field(self, fieldname, data):
object_methods = [method_name for method_name in dir(self)
if callable(getattr(self, method_name))]
field = self.fields[fieldname]
if fieldname in data.keys():
if data[fieldname] is None:
return None
field_method_name = 'convert_field_with_name_%s' % fieldname
if field_method_name in object_methods:
method = getattr(self, field_method_name)
return method(data)
try:
if field['ui_type'] == 'popup':
if field['data_type'] == 'int':
return int(data[fieldname]['id'])
if field['ui_type'] == 'text':
if field['data_type'] == 'int':
return int(data[fieldname])
if field['data_type'] == 'double':
return float(data[fieldname])
if field['data_type'] == 'float':
return float(data[fieldname])
if field['data_type'] == 'link':
return data[fieldname]
if field['data_type'] == 'text':
return data[fieldname]
if field['ui_type'] == 'wysiwyg':
if field['data_type'] == 'text':
return data[fieldname]
if field['ui_type'] == 'textarea':
if field['data_type'] == 'text_area':
return data[fieldname]
if field['ui_type'] == 'dialog':
if field['data_type'] == 'int':
return int(data[fieldname]['id'])
if field['data_type'] == 'text':
return data[fieldname]['id']
if field['ui_type'] == 'f_dialog_list':
if field['data_type'] == 'int':
return self.convert_list(data[fieldname])
if field['ui_type'] == 'datetime':
if field['data_type'] == 'date_time':
try:
rtn=data[fieldname]['title']
except:
rtn=data[fieldname]
return rtn
if field['ui_type'] == 'numeric':
if field['data_type'] == 'text':
return float(data[fieldname])
except:
raise Exception('Wrong conversion ',
self.obj_type, fieldname, field, data)
raise Exception('Unknwown data_type/ ui_type',
self.obj_type, fieldname, field, data)
def convert_incomming_category(self, data):
rtn = {}
for fieldname in self.fields.keys():
rtn[fieldname] = self.convert_field(fieldname, data)
if 'id' in data.keys():
rtn['id'] = int(data['id'])
rtn['_data'] = data
return rtn
def partial_equal(self, obj1, obj2):
if isinstance(obj1, str):
return (obj1 == obj2)
if isinstance(obj1, int):
return (obj1 == obj2)
if obj1 is None and obj2 is None:
return True
if obj1 is None:
return False
if obj2 is None:
return False
if isinstance(obj1, list):
if isinstance(obj2, list):
if len(obj1) != len(obj2):
return False
for ele in obj1:
if ele not in obj2:
return False
return True
else:
return False
for key in obj1.keys():
if key not in obj2:
return False
if not (self.partial_equal(obj1[key], obj2[key])):
return False
return True
# Funktioniert nur wo es genau eine Kategory gibt
def save_category_if_changed(self, objId, data):
oldData = self.read_category(objId)
if not (self.partial_equal(data, oldData)):
print('save', objId, self.obj_type)
pprint(data)
print('old:')
pprint(oldData)
r = self.save_category(objId, data)
print(r)
def update_categorys(self, objId, equals_attr, new_data_arr):
old_data_arr = self.read_categories(objId)
for old_data in old_data_arr:
old_found = False
for new_data in new_data_arr:
if old_data[equals_attr] == new_data[equals_attr]:
old_found = True
if not (self.partial_equal(new_data, old_data)):
print('save', objId, self.obj_type)
pprint(new_data)
print('old:')
pprint(old_data)
new_data['id'] = old_data['id']
r = self.update_category(objId, new_data)
pprint(r)
if not old_found:
print('delete:', objId, self.obj_type)
pprint(old_data)
self.delete_category(objId, old_data['id'])
for new_data in new_data_arr:
new_found = False
for old_data in old_data_arr:
if old_data[equals_attr] == new_data[equals_attr]:
new_found = True
if not new_found:
print('save', objId, self.obj_type)
pprint(new_data)
r = self.save_category(objId, new_data)
pprint(r)