fw_rules_builder/app/app.py

680 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
app.py — Flask backend для Firewall Rules Builder
"""
import json
import os
import tempfile
from flask import Flask, jsonify, request, render_template
app = Flask(__name__, template_folder='templates', static_folder='static')
# ─── Хранилище данных ────────────────────────────────────────────────────────
DATA_FILE = os.path.join(os.path.dirname(__file__), 'data.json')
EMPTY_DATA = {
"servers": {},
"nets": {},
"groups": {},
"services": {},
"service_groups": {},
"rules": []
}
def load_data():
if os.path.exists(DATA_FILE):
with open(DATA_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return json.loads(json.dumps(EMPTY_DATA))
def save_data(data):
with open(DATA_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# ─── Главная страница ─────────────────────────────────────────────────────────
@app.route('/')
def index():
return render_template('index.html')
# ─── API: Объекты (servers + nets) ───────────────────────────────────────────
@app.route('/api/objects', methods=['GET'])
def get_objects():
data = load_data()
result = {}
for k, v in data['servers'].items():
result[k] = v
for k, v in data['nets'].items():
result[k] = v
return jsonify(result)
@app.route('/api/objects', methods=['POST'])
def create_object():
data = load_data()
obj = request.json
key = obj.get('key', '').strip()
if not key:
return jsonify({'error': 'Ключ обязателен'}), 400
obj_type = obj.get('type', 'host')
obj_data = {
'hostname': obj.get('hostname', key),
'ip': obj.get('ip', ''),
'prefix': obj.get('prefix', '24'),
'gw': obj.get('gw', ''),
'domain': obj.get('domain', ''),
'description': obj.get('description', ''),
'type': obj_type,
'affinity': obj.get('affinity', []),
}
if obj_type == 'network':
if key in data['nets']:
return jsonify({'error': f'Объект "{key}" уже существует'}), 400
data['nets'][key] = obj_data
else:
if key in data['servers']:
return jsonify({'error': f'Объект "{key}" уже существует'}), 400
data['servers'][key] = obj_data
save_data(data)
return jsonify({'ok': True, 'key': key})
@app.route('/api/objects/<key>', methods=['PUT'])
def update_object(key):
data = load_data()
obj = request.json
obj_type = obj.get('type', 'host')
obj_data = {
'hostname': obj.get('hostname', key),
'ip': obj.get('ip', ''),
'prefix': obj.get('prefix', '24'),
'gw': obj.get('gw', ''),
'domain': obj.get('domain', ''),
'description': obj.get('description', ''),
'type': obj_type,
'affinity': obj.get('affinity', []),
}
# Удаляем из обоих словарей (тип мог измениться)
data['servers'].pop(key, None)
data['nets'].pop(key, None)
if obj_type == 'network':
data['nets'][key] = obj_data
else:
data['servers'][key] = obj_data
save_data(data)
return jsonify({'ok': True})
@app.route('/api/objects/<key>', methods=['DELETE'])
def delete_object(key):
data = load_data()
data['servers'].pop(key, None)
data['nets'].pop(key, None)
save_data(data)
return jsonify({'ok': True})
# ─── API: Группы объектов ─────────────────────────────────────────────────────
@app.route('/api/groups', methods=['GET'])
def get_groups():
data = load_data()
return jsonify(data['groups'])
@app.route('/api/groups', methods=['POST'])
def create_group():
data = load_data()
grp = request.json
key = grp.get('key', '').strip()
if not key:
return jsonify({'error': 'Ключ обязателен'}), 400
if key in data['groups']:
return jsonify({'error': f'Группа "{key}" уже существует'}), 400
data['groups'][key] = {
'name': grp.get('name', key),
'items': grp.get('items', [])
}
save_data(data)
return jsonify({'ok': True, 'key': key})
@app.route('/api/groups/<key>', methods=['PUT'])
def update_group(key):
data = load_data()
grp = request.json
data['groups'][key] = {
'name': grp.get('name', key),
'items': grp.get('items', [])
}
save_data(data)
return jsonify({'ok': True})
@app.route('/api/groups/<key>', methods=['DELETE'])
def delete_group(key):
data = load_data()
data['groups'].pop(key, None)
save_data(data)
return jsonify({'ok': True})
# ─── API: Сервисы ─────────────────────────────────────────────────────────────
@app.route('/api/services', methods=['GET'])
def get_services():
data = load_data()
return jsonify(data['services'])
@app.route('/api/services', methods=['POST'])
def create_service():
data = load_data()
svc = request.json
key = svc.get('key', '').strip()
if not key:
return jsonify({'error': 'Ключ обязателен'}), 400
if key in data['services']:
return jsonify({'error': f'Сервис "{key}" уже существует'}), 400
data['services'][key] = {
'name': svc.get('name', key),
'sport': svc.get('sport', 'any'),
'dport': svc.get('dport', ''),
'proto': svc.get('proto', 'tcp'),
}
save_data(data)
return jsonify({'ok': True, 'key': key})
@app.route('/api/services/<key>', methods=['PUT'])
def update_service(key):
data = load_data()
svc = request.json
data['services'][key] = {
'name': svc.get('name', key),
'sport': svc.get('sport', 'any'),
'dport': svc.get('dport', ''),
'proto': svc.get('proto', 'tcp'),
}
save_data(data)
return jsonify({'ok': True})
@app.route('/api/services/<key>', methods=['DELETE'])
def delete_service(key):
data = load_data()
data['services'].pop(key, None)
save_data(data)
return jsonify({'ok': True})
# ─── API: Группы сервисов ─────────────────────────────────────────────────────
@app.route('/api/service_groups', methods=['GET'])
def get_service_groups():
data = load_data()
return jsonify(data['service_groups'])
@app.route('/api/service_groups', methods=['POST'])
def create_service_group():
data = load_data()
sg = request.json
key = sg.get('key', '').strip()
if not key:
return jsonify({'error': 'Ключ обязателен'}), 400
if key in data['service_groups']:
return jsonify({'error': f'Группа сервисов "{key}" уже существует'}), 400
data['service_groups'][key] = {
'name': sg.get('name', key),
'items': sg.get('items', [])
}
save_data(data)
return jsonify({'ok': True, 'key': key})
@app.route('/api/service_groups/<key>', methods=['PUT'])
def update_service_group(key):
data = load_data()
sg = request.json
data['service_groups'][key] = {
'name': sg.get('name', key),
'items': sg.get('items', [])
}
save_data(data)
return jsonify({'ok': True})
@app.route('/api/service_groups/<key>', methods=['DELETE'])
def delete_service_group(key):
data = load_data()
data['service_groups'].pop(key, None)
save_data(data)
return jsonify({'ok': True})
# ─── API: Правила ─────────────────────────────────────────────────────────────
@app.route('/api/rules', methods=['GET'])
def get_rules():
data = load_data()
return jsonify(data['rules'])
@app.route('/api/rules', methods=['POST'])
def create_rule():
data = load_data()
rule = request.json
data['rules'].append(rule)
save_data(data)
return jsonify({'ok': True, 'index': len(data['rules']) - 1})
@app.route('/api/rules/<int:idx>', methods=['PUT'])
def update_rule(idx):
data = load_data()
if idx < 0 or idx >= len(data['rules']):
return jsonify({'error': 'Индекс вне диапазона'}), 404
data['rules'][idx] = request.json
save_data(data)
return jsonify({'ok': True})
@app.route('/api/rules/<int:idx>', methods=['DELETE'])
def delete_rule(idx):
data = load_data()
if idx < 0 or idx >= len(data['rules']):
return jsonify({'error': 'Индекс вне диапазона'}), 404
data['rules'].pop(idx)
save_data(data)
return jsonify({'ok': True})
@app.route('/api/rules/reorder', methods=['POST'])
def reorder_rules():
data = load_data()
new_order = request.json.get('order', [])
try:
data['rules'] = [data['rules'][i] for i in new_order]
except IndexError:
return jsonify({'error': 'Неверный порядок'}), 400
save_data(data)
return jsonify({'ok': True})
# ─── API: Экспорт fw_settings.py ─────────────────────────────────────────────
@app.route('/api/export', methods=['GET'])
def export_settings():
data = load_data()
content = generate_fw_settings(data)
return app.response_class(
response=content,
status=200,
mimetype='text/plain; charset=utf-8',
headers={'Content-Disposition': 'attachment; filename=fw_settings.py'}
)
def py_repr(val):
"""Представление Python-значения в виде строки."""
if isinstance(val, str):
# Используем двойные кавычки
escaped = val.replace('\\', '\\\\').replace('"', '\\"')
return f'"{escaped}"'
elif isinstance(val, list):
items = ', '.join(py_repr(i) for i in val)
return f'[{items}]'
elif isinstance(val, dict):
pairs = ', '.join(f'{py_repr(k)}: {py_repr(v)}' for k, v in val.items())
return '{' + pairs + '}'
elif isinstance(val, bool):
return 'True' if val else 'False'
elif val is None:
return 'None'
else:
return repr(val)
def generate_fw_settings(data):
lines = []
# servers
lines.append('servers = {')
for key, obj in data['servers'].items():
lines.append(f' {py_repr(key)}: {{')
lines.append(f' "hostname": {py_repr(obj.get("hostname", key))},')
lines.append(f' "ip": {py_repr(obj.get("ip", ""))},')
lines.append(f' "prefix": {py_repr(obj.get("prefix", "24"))},')
lines.append(f' "gw": {py_repr(obj.get("gw", ""))},')
lines.append(f' "domain": {py_repr(obj.get("domain", ""))},')
lines.append(f' "description": {py_repr(obj.get("description", ""))},')
lines.append(f' "type": "host",')
affinity = obj.get('affinity', [])
lines.append(f' "affinity": {py_repr(affinity)},')
lines.append(f' }},')
lines.append('}')
lines.append('')
# nets
lines.append('# networks')
lines.append('nets = {')
for key, obj in data['nets'].items():
lines.append(f' {py_repr(key)}: {{')
lines.append(f' "hostname": {py_repr(obj.get("hostname", key))},')
lines.append(f' "description": {py_repr(obj.get("description", ""))},')
lines.append(f' "domain": {py_repr(obj.get("domain", ""))},')
lines.append(f' "ip": {py_repr(obj.get("ip", ""))},')
prefix = obj.get('prefix', 24)
try:
prefix = int(prefix)
except (ValueError, TypeError):
pass
lines.append(f' "prefix": {prefix},')
lines.append(f' "type": "network",')
affinity = obj.get('affinity', [])
lines.append(f' "affinity": {py_repr(affinity)},')
lines.append(f' }},')
lines.append('}')
lines.append('')
lines.append('')
# groups
lines.append('groups = {')
for key, grp in data['groups'].items():
items = grp.get('items', [])
items_repr = '[' + ', '.join(f'{{"hostname": {py_repr(i.get("hostname",""))}}}' for i in items) + ']'
lines.append(f' {py_repr(key)}: {{"name": {py_repr(grp.get("name", key))}, "items": {items_repr}}},')
lines.append('}')
lines.append('')
lines.append('')
# services
lines.append('# services')
lines.append('services = {')
for key, svc in data['services'].items():
lines.append(f' {py_repr(key)}: {{')
lines.append(f' "name": {py_repr(svc.get("name", key))},')
lines.append(f' "sport": {py_repr(svc.get("sport", "any"))},')
lines.append(f' "dport": {py_repr(svc.get("dport", ""))},')
lines.append(f' "proto": {py_repr(svc.get("proto", "tcp"))},')
lines.append(f' }},')
lines.append('}')
lines.append('')
# service groups
lines.append('# service groups')
lines.append('service_groups = {')
for key, sg in data['service_groups'].items():
items = sg.get('items', [])
lines.append(f' {py_repr(key)}: {{')
lines.append(f' "name": {py_repr(sg.get("name", key))},')
lines.append(f' "items": [')
for svc_key in items:
lines.append(f' services[{py_repr(svc_key)}],')
lines.append(f' ],')
lines.append(f' }},')
lines.append('}')
lines.append('')
# rules
lines.append('# rules')
lines.append('rules = [')
for rule in data['rules']:
lines.append(' {')
lines.append(f' "name": {py_repr(rule.get("name", ""))},')
lines.append(f' "order": {rule.get("order", 0)},')
rtype = rule.get('type', 'rule')
lines.append(f' "type": {py_repr(rtype)},')
if rtype == 'span':
affinity = rule.get('affinity', [])
lines.append(f' "affinity": {py_repr(affinity)},')
else:
lines.append(f' "description": {py_repr(rule.get("description", ""))},')
# src_list
src_list = rule.get('src_list', [])
src_parts = _rule_ref_list(src_list)
lines.append(f' "src_list": [{", ".join(src_parts)}],')
# dst_list
dst_list = rule.get('dst_list', [])
dst_parts = _rule_ref_list(dst_list)
lines.append(f' "dst_list": [{", ".join(dst_parts)}],')
# service_list
svc_list = rule.get('service_list', [])
svc_parts = [f'services[{py_repr(s)}]' for s in svc_list] if svc_list else []
lines.append(f' "service_list": [{", ".join(svc_parts)}],')
# service_group_list
sg_list = rule.get('service_group_list', [])
sg_parts = [f'service_groups[{py_repr(s)}]' for s in sg_list] if sg_list else []
sg_val = f'[{", ".join(sg_parts)}]' if sg_parts else 'None'
lines.append(f' "service_group_list": {sg_val},')
lines.append(f' "action": {py_repr(rule.get("action", "allow"))},')
lines.append(f' "log": {py_repr(rule.get("log", "false"))},')
lines.append(f' "idp": {py_repr(rule.get("idp", "false"))},')
affinity = rule.get('affinity', [])
lines.append(f' "affinity": {py_repr(affinity)},')
lines.append(' },')
lines.append(']')
lines.append('')
return '\n'.join(lines)
def _rule_ref_list(items):
"""Генерирует Python-ссылки для src_list/dst_list."""
parts = []
for item in items:
ref_type = item.get('ref_type', 'group')
ref_key = item.get('ref_key', '')
if ref_type == 'server':
parts.append(f'servers[{py_repr(ref_key)}]')
elif ref_type == 'net':
parts.append(f'nets[{py_repr(ref_key)}]')
else:
parts.append(f'groups[{py_repr(ref_key)}]')
return parts
# ─── API: Сброс всех данных ──────────────────────────────────────────────────
@app.route('/api/reset', methods=['POST'])
def reset_data():
save_data(json.loads(json.dumps(EMPTY_DATA)))
return jsonify({'ok': True})
# ─── API: Загрузка fw_settings.py через браузер ──────────────────────────────
@app.route('/api/upload', methods=['POST'])
def upload_settings():
"""Принимает файл fw_settings.py через multipart/form-data и импортирует его."""
if 'file' not in request.files:
return jsonify({'error': 'Файл не передан'}), 400
f = request.files['file']
if not f.filename:
return jsonify({'error': 'Имя файла пустое'}), 400
try:
source = f.read().decode('utf-8')
return _parse_and_save(source)
except Exception as e:
return jsonify({'error': str(e)}), 500
def _parse_and_save(source):
"""Парсит исходный код fw_settings.py и сохраняет данные."""
try:
ns = {}
exec(compile(source, '<uploaded>', 'exec'), ns)
raw_servers = ns.get('servers', {})
raw_nets = ns.get('nets', {})
raw_groups = ns.get('groups', {})
raw_services = ns.get('services', {})
raw_service_groups = ns.get('service_groups', {})
raw_rules = ns.get('rules', [])
data = {
'servers': {},
'nets': {},
'groups': {},
'services': {},
'service_groups': {},
'rules': []
}
# Серверы
for k, v in raw_servers.items():
data['servers'][k] = {
'hostname': v.get('hostname', k),
'ip': v.get('ip', ''),
'prefix': str(v.get('prefix', '24')),
'gw': v.get('gw', ''),
'domain': v.get('domain', ''),
'description': v.get('description', ''),
'type': 'host',
'affinity': v.get('affinity', []),
}
# Сети
for k, v in raw_nets.items():
data['nets'][k] = {
'hostname': v.get('hostname', k),
'ip': v.get('ip', ''),
'prefix': str(v.get('prefix', '24')),
'gw': v.get('gw', ''),
'domain': v.get('domain', ''),
'description': v.get('description', ''),
'type': 'network',
'affinity': v.get('affinity', []),
}
# Группы
for k, v in raw_groups.items():
items = []
for item in v.get('items', []):
if isinstance(item, dict):
items.append({'hostname': item.get('hostname', '')})
data['groups'][k] = {
'name': v.get('name', k),
'items': items
}
# Сервисы
for k, v in raw_services.items():
data['services'][k] = {
'name': v.get('name', k),
'sport': v.get('sport', 'any'),
'dport': v.get('dport', ''),
'proto': v.get('proto', 'tcp'),
}
# Группы сервисов — сохраняем ключи сервисов
svc_by_name = {v['name']: k for k, v in raw_services.items()}
for k, v in raw_service_groups.items():
item_keys = []
for svc in v.get('items', []):
if isinstance(svc, dict):
svc_name = svc.get('name', '')
# Ищем ключ по имени
found_key = svc_by_name.get(svc_name)
if found_key:
item_keys.append(found_key)
else:
# Ищем по совпадению значений
for sk, sv in raw_services.items():
if sv == svc:
item_keys.append(sk)
break
data['service_groups'][k] = {
'name': v.get('name', k),
'items': item_keys
}
# Правила — конвертируем ссылки в ref_type/ref_key
all_server_ids = {id(v): k for k, v in raw_servers.items()}
all_net_ids = {id(v): k for k, v in raw_nets.items()}
all_group_ids = {id(v): k for k, v in raw_groups.items()}
all_svc_ids = {id(v): k for k, v in raw_services.items()}
all_sg_ids = {id(v): k for k, v in raw_service_groups.items()}
for rule in raw_rules:
rtype = rule.get('type', 'rule')
new_rule = {
'name': rule.get('name', ''),
'order': rule.get('order', 0),
'type': rtype,
'affinity': rule.get('affinity', []),
}
if rtype != 'span':
new_rule['description'] = rule.get('description', '')
new_rule['action'] = rule.get('action', 'allow')
new_rule['log'] = str(rule.get('log', 'false'))
new_rule['idp'] = str(rule.get('idp', 'false'))
# src_list
src_list = []
for item in (rule.get('src_list') or []):
ref = _resolve_ref(item, all_server_ids, all_net_ids, all_group_ids)
if ref:
src_list.append(ref)
new_rule['src_list'] = src_list
# dst_list
dst_list = []
for item in (rule.get('dst_list') or []):
ref = _resolve_ref(item, all_server_ids, all_net_ids, all_group_ids)
if ref:
dst_list.append(ref)
new_rule['dst_list'] = dst_list
# service_list
svc_list = []
for svc in (rule.get('service_list') or []):
key = all_svc_ids.get(id(svc))
if key:
svc_list.append(key)
new_rule['service_list'] = svc_list
# service_group_list
sg_list = []
for sg in (rule.get('service_group_list') or []):
key = all_sg_ids.get(id(sg))
if key:
sg_list.append(key)
new_rule['service_group_list'] = sg_list
data['rules'].append(new_rule)
save_data(data)
return jsonify({'ok': True, 'message': 'Импорт выполнен успешно'})
except Exception as e:
return jsonify({'error': str(e)}), 500
def _resolve_ref(item, server_ids, net_ids, group_ids):
obj_id = id(item)
if obj_id in server_ids:
return {'ref_type': 'server', 'ref_key': server_ids[obj_id]}
if obj_id in net_ids:
return {'ref_type': 'net', 'ref_key': net_ids[obj_id]}
if obj_id in group_ids:
return {'ref_type': 'group', 'ref_key': group_ids[obj_id]}
# Попробуем по hostname
if isinstance(item, dict):
hostname = item.get('hostname', '')
if hostname in server_ids.values():
return {'ref_type': 'server', 'ref_key': hostname}
if hostname in net_ids.values():
return {'ref_type': 'net', 'ref_key': hostname}
name = item.get('name', '')
if name in group_ids.values():
return {'ref_type': 'group', 'ref_key': name}
return None
# ─── API: Получить все данные сразу ──────────────────────────────────────────
@app.route('/api/all', methods=['GET'])
def get_all():
return jsonify(load_data())
if __name__ == '__main__':
app.run(debug=True, port=5000)