opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
wordpress
➕ New
📤 Upload
✎ Editing:
changelog_processor.py
← Back
"""Processor for WordPress rule disable/enable changelog files. The PHP WordPress plugin writes rule change actions to changelog.php when a user disables or enables protection rules from the WordPress admin panel. This module reads, parses, and applies those actions to the agent database. The changelog.php file uses the same format as incident files: <?php __halt_compiler(); #{base64-encoded JSON for action 1} #{base64-encoded JSON for action 2} Each JSON action has the form: {"action": "disable"|"enable", "rule_id": "xyz", "ts": ...} The user_id stored with each action is the system UID of the WordPress site owner (site.uid). """ import logging from pathlib import Path from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import MessageSink from defence360agent.model.wordpress import WPSite, WordpressSite from defence360agent.model.wp_disabled_rule import WPDisabledRule from defence360agent.wordpress.cli import get_data_dir from defence360agent.wordpress.incident_parser import IncidentFileParser from defence360agent.wordpress.utils import parse_php_with_embedded_json logger = logging.getLogger(__name__) CHANGELOG_FILENAME = "changelog.php" DISABLED_RULES_FILENAME = "disabled-rules.php" ACTION_DISABLE = "disable" ACTION_ENABLE = "enable" class ChangelogProcessor: """Process WordPress rule disable/enable changelog files. Reads changelog.php from each site's data directory, applies disable/enable actions to the WPDisabledRule database, reports events to the correlation server, and deletes the file after processing. If no changelog exists (or no new entries), checks whether disabled-rules.php has been modified externally (e.g. backup restore) and flags the domain for regeneration. """ def __init__(self) -> None: # changelog.php uses the same format as incident files # (base64-encoded JSON lines wrapped in PHP), so we reuse the parser self.parser = IncidentFileParser() async def process_changelogs_for_sites( self, sites: list[WPSite], sink: MessageSink | None, ) -> list[WPSite]: """Process changelog.php for all given sites. Args: sites: WordPress sites to process. sink: MessageSink for sending correlation events. Returns: Sites whose disabled rules were affected (needing disabled-rules.php regeneration). """ affected: list[WPSite] = [] for site in sites: if await self._process_site(site, sink): affected.append(site) if affected: logger.info( "Changelog processing affected %d site(s)", len(affected), ) return affected async def _process_site( self, site: WPSite, sink: MessageSink | None, ) -> bool: """Process changelog.php for a single site. Args: site: WordPress site to process. sink: MessageSink for sending correlation events. Returns: True if the site's disabled rules were affected. """ try: data_dir = await get_data_dir(site) if not data_dir.exists(): return False changelog_path = data_dir / CHANGELOG_FILENAME if changelog_path.exists(): if await self._process_changelog_file( changelog_path, site, sink ): return True if self._is_disabled_rules_file_stale(site, data_dir): return True except Exception as e: logger.error( "Error processing changelog for site %s: %s", site.docroot, e, ) return False def _consume_changelog( self, changelog_path: Path, site: WPSite ) -> list[dict]: """Parse a changelog file and delete it. The file is deleted regardless of whether parsing succeeds. """ try: return self.parser.parse_file(changelog_path) except (OSError, ValueError) as e: logger.error( "Failed to parse changelog for site %s: %s", site.docroot, e, ) return [] finally: try: changelog_path.unlink(missing_ok=True) except OSError as e: logger.error( "Failed to delete changelog for site %s: %s", site.docroot, e, ) async def _process_changelog_file( self, changelog_path: Path, site: WPSite, sink: MessageSink | None, ) -> bool: """Parse and apply actions from a changelog file. The file is always deleted after reading, even on parse errors. Actions older than the last sync timestamp are skipped to prevent stale changelog files (e.g. from backup restores) from undoing more recent changes. Returns: True if any DB changes occurred. """ actions = self._consume_changelog(changelog_path, site) if not actions: return False last_sync_ts = self._get_last_sync_ts(site) changed = False for action in actions: try: timestamp = float(action.get("ts", 0)) if timestamp <= 0: raise ValueError( "Missing or invalid timestamp in changelog action" f" for rule {action.get('rule_id', '?')}" f" on site {site.docroot}" ) if last_sync_ts is not None and timestamp <= last_sync_ts: logger.info( "Skipping stale changelog action for rule %s" " on site %s (ts=%.0f <= sync_ts=%.0f)", action.get("rule_id", "?"), site.docroot, timestamp, last_sync_ts, ) continue if self._process_action(action, site, timestamp): changed = True await self._report_action(action, site, sink, timestamp) except ValueError as e: logger.warning("Skipping invalid changelog entry: %s", e) except Exception as e: logger.error( "Failed to process changelog action %s for site %s: %s", action, site.docroot, e, ) logger.info( "Processed changelog for site %s: %d action(s), changed=%s", site.docroot, len(actions), changed, ) return changed def _process_action( self, action: dict, site: WPSite, timestamp: float ) -> bool: """Apply a single changelog action to the database. Args: action: Parsed action dict with keys: action, rule_id, ts. site: The WordPress site the action belongs to. timestamp: Pre-resolved Unix timestamp for this action. Returns: True if the database state was modified. Raises: ValueError: If the action is missing required fields or has an unknown action type. """ action_type = action.get("action") rule_id = action.get("rule_id") if not action_type or not rule_id: raise ValueError( f"Missing action or rule_id in changelog entry: {action}" ) if action_type == ACTION_DISABLE: return self._apply_disable(rule_id, site, timestamp) elif action_type == ACTION_ENABLE: return self._apply_enable(rule_id, site) else: raise ValueError( f"Unknown changelog action '{action_type}'" f" for rule {rule_id} on site {site.docroot}" ) @staticmethod def _get_last_sync_ts(site: WPSite) -> float | None: """Get the last disabled-rules sync timestamp for a site. Returns None if the site has no DB record or no sync timestamp, meaning all actions should be processed. """ try: db_site = WordpressSite.get_by_id(site.docroot) return db_site.disabled_rules_sync_ts except WordpressSite.DoesNotExist: return None @staticmethod def _apply_disable(rule_id: str, site: WPSite, timestamp: float) -> bool: """Apply a disable action from the changelog. Returns: True if a new disable entry was created (not a no-op). """ count = WPDisabledRule.store( rule_id=rule_id, domains=[site.domain], source=WPDisabledRule.SOURCE_WORDPRESS, user_id=site.uid, timestamp=timestamp, ) return count > 0 def _apply_enable(self, rule_id: str, site: WPSite) -> bool: """Apply an enable action from the changelog. Returns: True if a disable entry was removed. """ count = WPDisabledRule.remove( rule_id=rule_id, domains=[site.domain], ) return count > 0 @staticmethod async def _report_action( action: dict, site: WPSite, sink: MessageSink | None, timestamp: float, ) -> None: """Send a rule change event to the correlation server. Must only be called for valid actions (after _process_action succeeds). """ if sink is None: return action_type = action["action"] rule_id = action["rule_id"] if action_type == ACTION_DISABLE: message_cls = MessageType.WPRuleDisabled elif action_type == ACTION_ENABLE: message_cls = MessageType.WPRuleEnabled else: return try: await sink.process_message( message_cls( plugin_id="wordpress", rule=rule_id, domains=[site.domain], timestamp=timestamp, user_id=site.uid, source=WPDisabledRule.SOURCE_WORDPRESS, ) ) except Exception as e: logger.error( "Failed to report changelog action for rule %s on site %s: %s", rule_id, site.docroot, e, ) @staticmethod def _is_disabled_rules_file_stale( site: WPSite, data_dir: Path, ) -> bool: """Check if disabled-rules.php was modified externally. Reads the embedded timestamp from the file and compares it against the stored sync timestamp in the database. If they differ (e.g. file restored from backup), returns True to trigger regeneration. """ disabled_rules_path = data_dir / DISABLED_RULES_FILENAME if not disabled_rules_path.exists(): return False try: content = disabled_rules_path.read_text() data = parse_php_with_embedded_json(content) file_ts = float(data.get("ts", 0)) except (OSError, ValueError) as e: logger.warning( "Cannot read disabled-rules.php for site %s: %s", site.docroot, e, ) return False try: db_site = WordpressSite.get_by_id(site.docroot) except WordpressSite.DoesNotExist: return False db_ts = db_site.disabled_rules_sync_ts return db_ts is None or abs(file_ts - db_ts) > 1.0
💾 Save Changes
Cancel
📤 Upload File
×
Select File
Upload
Cancel
➕ Create New
×
Type
📄 File
📁 Folder
Name
Create
Cancel
✎ Rename Item
×
Current Name
New Name
Rename
Cancel
🔐 Change Permissions
×
Target File
Permission (e.g., 0755, 0644)
0755
0644
0777
Apply
Cancel