fw_rules_builder/app/app.py

665 lines
24 KiB
Python

"""
app.py — Flask backend для Firewall Rules Builder
"""
import json
import os
import sys
import re
from flask import Flask, jsonify, request, render_template, send_from_directory
app = Flask(__name__, template_folder='templates', static_folder='static')
# ─── Хранилище данных ────────────────────────────────────────────────────────
DATA_FILE = os.path.join(os.path.dirname(__file__), 'data.json')
EMPTY_DATA = {
"servers": {},
"nets": {},
"groups": {},
"services": {},
"service_groups": {},
"rules": []
}
def load_data():
if os.path.exists(DATA_FILE):
with open(DATA_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return json.loads(json.dumps(EMPTY_DATA))
def save_data(data):
with open(DATA_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# ─── Главная страница ─────────────────────────────────────────────────────────
@app.route('/')
def index():
return render_template('index.html')
# ─── API: Объекты (servers + nets) ───────────────────────────────────────────
@app.route('/api/objects', methods=['GET'])
def get_objects():
data = load_data()
result = {}
for k, v in data['servers'].items():
result[k] = v
for k, v in data['nets'].items():
result[k] = v
return jsonify(result)
@app.route('/api/objects', methods=['POST'])
def create_object():
data = load_data()
obj = request.json
key = obj.get('key', '').strip()
if not key:
return jsonify({'error': 'Ключ обязателен'}), 400
obj_type = obj.get('type', 'host')
obj_data = {
'hostname': obj.get('hostname', key),
'ip': obj.get('ip', ''),
'prefix': obj.get('prefix', '24'),
'gw': obj.get('gw', ''),
'domain': obj.get('domain', ''),
'description': obj.get('description', ''),
'type': obj_type,
'affinity': obj.get('affinity', []),
}
if obj_type == 'network':
if key in data['nets']:
return jsonify({'error': f'Объект "{key}" уже существует'}), 400
data['nets'][key] = obj_data
else:
if key in data['servers']:
return jsonify({'error': f'Объект "{key}" уже существует'}), 400
data['servers'][key] = obj_data
save_data(data)
return jsonify({'ok': True, 'key': key})
@app.route('/api/objects/<key>', methods=['PUT'])
def update_object(key):
data = load_data()
obj = request.json
obj_type = obj.get('type', 'host')
obj_data = {
'hostname': obj.get('hostname', key),
'ip': obj.get('ip', ''),
'prefix': obj.get('prefix', '24'),
'gw': obj.get('gw', ''),
'domain': obj.get('domain', ''),
'description': obj.get('description', ''),
'type': obj_type,
'affinity': obj.get('affinity', []),
}
# Удаляем из обоих словарей (тип мог измениться)
data['servers'].pop(key, None)
data['nets'].pop(key, None)
if obj_type == 'network':
data['nets'][key] = obj_data
else:
data['servers'][key] = obj_data
save_data(data)
return jsonify({'ok': True})
@app.route('/api/objects/<key>', methods=['DELETE'])
def delete_object(key):
data = load_data()
data['servers'].pop(key, None)
data['nets'].pop(key, None)
save_data(data)
return jsonify({'ok': True})
# ─── API: Группы объектов ─────────────────────────────────────────────────────
@app.route('/api/groups', methods=['GET'])
def get_groups():
data = load_data()
return jsonify(data['groups'])
@app.route('/api/groups', methods=['POST'])
def create_group():
data = load_data()
grp = request.json
key = grp.get('key', '').strip()
if not key:
return jsonify({'error': 'Ключ обязателен'}), 400
if key in data['groups']:
return jsonify({'error': f'Группа "{key}" уже существует'}), 400
data['groups'][key] = {
'name': grp.get('name', key),
'items': grp.get('items', [])
}
save_data(data)
return jsonify({'ok': True, 'key': key})
@app.route('/api/groups/<key>', methods=['PUT'])
def update_group(key):
data = load_data()
grp = request.json
data['groups'][key] = {
'name': grp.get('name', key),
'items': grp.get('items', [])
}
save_data(data)
return jsonify({'ok': True})
@app.route('/api/groups/<key>', methods=['DELETE'])
def delete_group(key):
data = load_data()
data['groups'].pop(key, None)
save_data(data)
return jsonify({'ok': True})
# ─── API: Сервисы ─────────────────────────────────────────────────────────────
@app.route('/api/services', methods=['GET'])
def get_services():
data = load_data()
return jsonify(data['services'])
@app.route('/api/services', methods=['POST'])
def create_service():
data = load_data()
svc = request.json
key = svc.get('key', '').strip()
if not key:
return jsonify({'error': 'Ключ обязателен'}), 400
if key in data['services']:
return jsonify({'error': f'Сервис "{key}" уже существует'}), 400
data['services'][key] = {
'name': svc.get('name', key),
'sport': svc.get('sport', 'any'),
'dport': svc.get('dport', ''),
'proto': svc.get('proto', 'tcp'),
}
save_data(data)
return jsonify({'ok': True, 'key': key})
@app.route('/api/services/<key>', methods=['PUT'])
def update_service(key):
data = load_data()
svc = request.json
data['services'][key] = {
'name': svc.get('name', key),
'sport': svc.get('sport', 'any'),
'dport': svc.get('dport', ''),
'proto': svc.get('proto', 'tcp'),
}
save_data(data)
return jsonify({'ok': True})
@app.route('/api/services/<key>', methods=['DELETE'])
def delete_service(key):
data = load_data()
data['services'].pop(key, None)
save_data(data)
return jsonify({'ok': True})
# ─── API: Группы сервисов ─────────────────────────────────────────────────────
@app.route('/api/service_groups', methods=['GET'])
def get_service_groups():
data = load_data()
return jsonify(data['service_groups'])
@app.route('/api/service_groups', methods=['POST'])
def create_service_group():
data = load_data()
sg = request.json
key = sg.get('key', '').strip()
if not key:
return jsonify({'error': 'Ключ обязателен'}), 400
if key in data['service_groups']:
return jsonify({'error': f'Группа сервисов "{key}" уже существует'}), 400
data['service_groups'][key] = {
'name': sg.get('name', key),
'items': sg.get('items', [])
}
save_data(data)
return jsonify({'ok': True, 'key': key})
@app.route('/api/service_groups/<key>', methods=['PUT'])
def update_service_group(key):
data = load_data()
sg = request.json
data['service_groups'][key] = {
'name': sg.get('name', key),
'items': sg.get('items', [])
}
save_data(data)
return jsonify({'ok': True})
@app.route('/api/service_groups/<key>', methods=['DELETE'])
def delete_service_group(key):
data = load_data()
data['service_groups'].pop(key, None)
save_data(data)
return jsonify({'ok': True})
# ─── API: Правила ─────────────────────────────────────────────────────────────
@app.route('/api/rules', methods=['GET'])
def get_rules():
data = load_data()
return jsonify(data['rules'])
@app.route('/api/rules', methods=['POST'])
def create_rule():
data = load_data()
rule = request.json
data['rules'].append(rule)
save_data(data)
return jsonify({'ok': True, 'index': len(data['rules']) - 1})
@app.route('/api/rules/<int:idx>', methods=['PUT'])
def update_rule(idx):
data = load_data()
if idx < 0 or idx >= len(data['rules']):
return jsonify({'error': 'Индекс вне диапазона'}), 404
data['rules'][idx] = request.json
save_data(data)
return jsonify({'ok': True})
@app.route('/api/rules/<int:idx>', methods=['DELETE'])
def delete_rule(idx):
data = load_data()
if idx < 0 or idx >= len(data['rules']):
return jsonify({'error': 'Индекс вне диапазона'}), 404
data['rules'].pop(idx)
save_data(data)
return jsonify({'ok': True})
@app.route('/api/rules/reorder', methods=['POST'])
def reorder_rules():
data = load_data()
new_order = request.json.get('order', [])
try:
data['rules'] = [data['rules'][i] for i in new_order]
except IndexError:
return jsonify({'error': 'Неверный порядок'}), 400
save_data(data)
return jsonify({'ok': True})
# ─── API: Экспорт fw_settings.py ─────────────────────────────────────────────
@app.route('/api/export', methods=['GET'])
def export_settings():
data = load_data()
content = generate_fw_settings(data)
return app.response_class(
response=content,
status=200,
mimetype='text/plain; charset=utf-8',
headers={'Content-Disposition': 'attachment; filename=fw_settings.py'}
)
def py_repr(val):
"""Представление Python-значения в виде строки."""
if isinstance(val, str):
# Используем двойные кавычки
escaped = val.replace('\\', '\\\\').replace('"', '\\"')
return f'"{escaped}"'
elif isinstance(val, list):
items = ', '.join(py_repr(i) for i in val)
return f'[{items}]'
elif isinstance(val, dict):
pairs = ', '.join(f'{py_repr(k)}: {py_repr(v)}' for k, v in val.items())
return '{' + pairs + '}'
elif isinstance(val, bool):
return 'True' if val else 'False'
elif val is None:
return 'None'
else:
return repr(val)
def generate_fw_settings(data):
lines = []
# servers
lines.append('servers = {')
for key, obj in data['servers'].items():
lines.append(f' {py_repr(key)}: {{')
lines.append(f' "hostname": {py_repr(obj.get("hostname", key))},')
lines.append(f' "ip": {py_repr(obj.get("ip", ""))},')
lines.append(f' "prefix": {py_repr(obj.get("prefix", "24"))},')
lines.append(f' "gw": {py_repr(obj.get("gw", ""))},')
lines.append(f' "domain": {py_repr(obj.get("domain", ""))},')
lines.append(f' "description": {py_repr(obj.get("description", ""))},')
lines.append(f' "type": "host",')
affinity = obj.get('affinity', [])
lines.append(f' "affinity": {py_repr(affinity)},')
lines.append(f' }},')
lines.append('}')
lines.append('')
# nets
lines.append('# networks')
lines.append('nets = {')
for key, obj in data['nets'].items():
lines.append(f' {py_repr(key)}: {{')
lines.append(f' "hostname": {py_repr(obj.get("hostname", key))},')
lines.append(f' "description": {py_repr(obj.get("description", ""))},')
lines.append(f' "domain": {py_repr(obj.get("domain", ""))},')
lines.append(f' "ip": {py_repr(obj.get("ip", ""))},')
prefix = obj.get('prefix', 24)
try:
prefix = int(prefix)
except (ValueError, TypeError):
pass
lines.append(f' "prefix": {prefix},')
lines.append(f' "type": "network",')
affinity = obj.get('affinity', [])
lines.append(f' "affinity": {py_repr(affinity)},')
lines.append(f' }},')
lines.append('}')
lines.append('')
lines.append('')
# groups
lines.append('groups = {')
for key, grp in data['groups'].items():
items = grp.get('items', [])
items_repr = '[' + ', '.join(f'{{"hostname": {py_repr(i.get("hostname",""))}}}' for i in items) + ']'
lines.append(f' {py_repr(key)}: {{"name": {py_repr(grp.get("name", key))}, "items": {items_repr}}},')
lines.append('}')
lines.append('')
lines.append('')
# services
lines.append('# services')
lines.append('services = {')
for key, svc in data['services'].items():
lines.append(f' {py_repr(key)}: {{')
lines.append(f' "name": {py_repr(svc.get("name", key))},')
lines.append(f' "sport": {py_repr(svc.get("sport", "any"))},')
lines.append(f' "dport": {py_repr(svc.get("dport", ""))},')
lines.append(f' "proto": {py_repr(svc.get("proto", "tcp"))},')
lines.append(f' }},')
lines.append('}')
lines.append('')
# service groups
lines.append('# service groups')
lines.append('service_groups = {')
for key, sg in data['service_groups'].items():
items = sg.get('items', [])
lines.append(f' {py_repr(key)}: {{')
lines.append(f' "name": {py_repr(sg.get("name", key))},')
lines.append(f' "items": [')
for svc_key in items:
lines.append(f' services[{py_repr(svc_key)}],')
lines.append(f' ],')
lines.append(f' }},')
lines.append('}')
lines.append('')
# rules
lines.append('# rules')
lines.append('rules = [')
for rule in data['rules']:
lines.append(' {')
lines.append(f' "name": {py_repr(rule.get("name", ""))},')
lines.append(f' "order": {rule.get("order", 0)},')
rtype = rule.get('type', 'rule')
lines.append(f' "type": {py_repr(rtype)},')
if rtype == 'span':
affinity = rule.get('affinity', [])
lines.append(f' "affinity": {py_repr(affinity)},')
else:
lines.append(f' "description": {py_repr(rule.get("description", ""))},')
# src_list
src_list = rule.get('src_list', [])
src_parts = _rule_ref_list(src_list)
lines.append(f' "src_list": [{", ".join(src_parts)}],')
# dst_list
dst_list = rule.get('dst_list', [])
dst_parts = _rule_ref_list(dst_list)
lines.append(f' "dst_list": [{", ".join(dst_parts)}],')
# service_list
svc_list = rule.get('service_list', [])
svc_parts = [f'services[{py_repr(s)}]' for s in svc_list] if svc_list else []
lines.append(f' "service_list": [{", ".join(svc_parts)}],')
# service_group_list
sg_list = rule.get('service_group_list', [])
sg_parts = [f'service_groups[{py_repr(s)}]' for s in sg_list] if sg_list else []
sg_val = f'[{", ".join(sg_parts)}]' if sg_parts else 'None'
lines.append(f' "service_group_list": {sg_val},')
lines.append(f' "action": {py_repr(rule.get("action", "allow"))},')
lines.append(f' "log": {py_repr(rule.get("log", "false"))},')
lines.append(f' "idp": {py_repr(rule.get("idp", "false"))},')
affinity = rule.get('affinity', [])
lines.append(f' "affinity": {py_repr(affinity)},')
lines.append(' },')
lines.append(']')
lines.append('')
return '\n'.join(lines)
def _rule_ref_list(items):
"""Генерирует Python-ссылки для src_list/dst_list."""
parts = []
for item in items:
ref_type = item.get('ref_type', 'group')
ref_key = item.get('ref_key', '')
if ref_type == 'server':
parts.append(f'servers[{py_repr(ref_key)}]')
elif ref_type == 'net':
parts.append(f'nets[{py_repr(ref_key)}]')
else:
parts.append(f'groups[{py_repr(ref_key)}]')
return parts
# ─── API: Импорт fw_settings.py ──────────────────────────────────────────────
@app.route('/api/import', methods=['POST'])
def import_settings():
"""Импортирует данные из fw_settings.py через exec()."""
try:
fw_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'fw_settings.py')
if not os.path.exists(fw_path):
return jsonify({'error': 'fw_settings.py не найден'}), 404
ns = {}
with open(fw_path, 'r', encoding='utf-8') as f:
source = f.read()
exec(compile(source, fw_path, 'exec'), ns)
raw_servers = ns.get('servers', {})
raw_nets = ns.get('nets', {})
raw_groups = ns.get('groups', {})
raw_services = ns.get('services', {})
raw_service_groups = ns.get('service_groups', {})
raw_rules = ns.get('rules', [])
data = {
'servers': {},
'nets': {},
'groups': {},
'services': {},
'service_groups': {},
'rules': []
}
# Серверы
for k, v in raw_servers.items():
data['servers'][k] = {
'hostname': v.get('hostname', k),
'ip': v.get('ip', ''),
'prefix': str(v.get('prefix', '24')),
'gw': v.get('gw', ''),
'domain': v.get('domain', ''),
'description': v.get('description', ''),
'type': 'host',
'affinity': v.get('affinity', []),
}
# Сети
for k, v in raw_nets.items():
data['nets'][k] = {
'hostname': v.get('hostname', k),
'ip': v.get('ip', ''),
'prefix': str(v.get('prefix', '24')),
'gw': v.get('gw', ''),
'domain': v.get('domain', ''),
'description': v.get('description', ''),
'type': 'network',
'affinity': v.get('affinity', []),
}
# Группы
for k, v in raw_groups.items():
items = []
for item in v.get('items', []):
if isinstance(item, dict):
items.append({'hostname': item.get('hostname', '')})
data['groups'][k] = {
'name': v.get('name', k),
'items': items
}
# Сервисы
for k, v in raw_services.items():
data['services'][k] = {
'name': v.get('name', k),
'sport': v.get('sport', 'any'),
'dport': v.get('dport', ''),
'proto': v.get('proto', 'tcp'),
}
# Группы сервисов — сохраняем ключи сервисов
svc_by_name = {v['name']: k for k, v in raw_services.items()}
for k, v in raw_service_groups.items():
item_keys = []
for svc in v.get('items', []):
if isinstance(svc, dict):
svc_name = svc.get('name', '')
# Ищем ключ по имени
found_key = svc_by_name.get(svc_name)
if found_key:
item_keys.append(found_key)
else:
# Ищем по совпадению значений
for sk, sv in raw_services.items():
if sv == svc:
item_keys.append(sk)
break
data['service_groups'][k] = {
'name': v.get('name', k),
'items': item_keys
}
# Правила — конвертируем ссылки в ref_type/ref_key
all_server_ids = {id(v): k for k, v in raw_servers.items()}
all_net_ids = {id(v): k for k, v in raw_nets.items()}
all_group_ids = {id(v): k for k, v in raw_groups.items()}
all_svc_ids = {id(v): k for k, v in raw_services.items()}
all_sg_ids = {id(v): k for k, v in raw_service_groups.items()}
for rule in raw_rules:
rtype = rule.get('type', 'rule')
new_rule = {
'name': rule.get('name', ''),
'order': rule.get('order', 0),
'type': rtype,
'affinity': rule.get('affinity', []),
}
if rtype != 'span':
new_rule['description'] = rule.get('description', '')
new_rule['action'] = rule.get('action', 'allow')
new_rule['log'] = str(rule.get('log', 'false'))
new_rule['idp'] = str(rule.get('idp', 'false'))
# src_list
src_list = []
for item in (rule.get('src_list') or []):
ref = _resolve_ref(item, all_server_ids, all_net_ids, all_group_ids)
if ref:
src_list.append(ref)
new_rule['src_list'] = src_list
# dst_list
dst_list = []
for item in (rule.get('dst_list') or []):
ref = _resolve_ref(item, all_server_ids, all_net_ids, all_group_ids)
if ref:
dst_list.append(ref)
new_rule['dst_list'] = dst_list
# service_list
svc_list = []
for svc in (rule.get('service_list') or []):
key = all_svc_ids.get(id(svc))
if key:
svc_list.append(key)
new_rule['service_list'] = svc_list
# service_group_list
sg_list = []
for sg in (rule.get('service_group_list') or []):
key = all_sg_ids.get(id(sg))
if key:
sg_list.append(key)
new_rule['service_group_list'] = sg_list
data['rules'].append(new_rule)
save_data(data)
return jsonify({'ok': True, 'message': 'Импорт выполнен успешно'})
except Exception as e:
return jsonify({'error': str(e)}), 500
def _resolve_ref(item, server_ids, net_ids, group_ids):
obj_id = id(item)
if obj_id in server_ids:
return {'ref_type': 'server', 'ref_key': server_ids[obj_id]}
if obj_id in net_ids:
return {'ref_type': 'net', 'ref_key': net_ids[obj_id]}
if obj_id in group_ids:
return {'ref_type': 'group', 'ref_key': group_ids[obj_id]}
# Попробуем по hostname
if isinstance(item, dict):
hostname = item.get('hostname', '')
if hostname in server_ids.values():
return {'ref_type': 'server', 'ref_key': hostname}
if hostname in net_ids.values():
return {'ref_type': 'net', 'ref_key': hostname}
name = item.get('name', '')
if name in group_ids.values():
return {'ref_type': 'group', 'ref_key': name}
return None
# ─── API: Получить все данные сразу ──────────────────────────────────────────
@app.route('/api/all', methods=['GET'])
def get_all():
return jsonify(load_data())
if __name__ == '__main__':
app.run(debug=True, port=5000)