top of page

Lambda Function Setup Guide for Security Group Event Notifications in Slack

Writer's picture: Lency KorienLency Korien

Overview

This document provides a step-by-step guide to creating a Lambda function that sends notifications to Slack when a security group rule is modified in AWS.


Prerequisites

  1. AWS Account with necessary permissions to create and configure Lambda, IAM, CloudTrail, and CloudWatch Logs.

  2. Slack workspace with permissions to create a new app and generate an incoming webhook URL.



Architecture

Security Group Rule Event -> CloudTrail -> CloudWatch Logs -> Lambda Function -> Slack


Step 1: Setup CloudTrail

  1. Go to CloudTrail Console:

  • Navigate to the AWS Management Console.

  • Go to the CloudTrail service.


  1. Create or Configure a Trail:


  • Create a new trail or use an existing one.

  • Ensure that the trail is configured to log management events.

  • Enable the trail to send logs to CloudWatch Logs.

Step 2: Setup CloudWatch Logs

  1. Create Log Group:

  • Navigate to CloudWatch in the AWS Management Console.

  • Create a new log group or use an existing one to receive CloudTrail logs

Step 3: Create Lambda Function

  1. Create a Lambda Function:

  • Go to the AWS Lambda Console.

  • Create a new Lambda function with a suitable name, runtime (Python 3.8+), and role with the necessary permissions.


  1. Add the Code mentioned below:

import boto3import jsonimport osimport base64import gzipimport subprocessec2_client = boto3.client('ec2', region_name='us-east-1')webhook_url = os.environ.get('WEBHOOK_URL')def slack_message(sg_id, username, port, cidr, action):    message = f""":siren: *Security Group Rules Alert* :siren:<!channel> Hey team,A security group rule has been modified. Here are the details: *Action:* `{action}` *Created By:* `{username}` *Security Group Id:* `{sg_id}` *Port Number:* `{port}` *CIDR:* `{cidr}` *Potential Impact:* Access rule change detected. *Actions Required:* Review the security group rules to ensure they meet your security requirements.Thank you for your attention to this matter! :hammer_and_spanner:"""    return messagedef send_alert(sg_id, username, port, cidr, action):    message = slack_message(sg_id, username, port, cidr, action)    payload = {        "channel": "",        "username": "Bot",        "text": message,        "icon_emoji": ":alert:"    }    json_payload = json.dumps(payload)    curl_command = ['curl', '-X', 'POST', '-H', 'Content-Type: application/json', '-d', json_payload, webhook_url]    response = subprocess.run(curl_command, check=True, capture_output=True, text=True)def parse_event(event):    if 'detail' in event:        action = event['detail']['eventName']        username = event['detail']['userIdentity']['principalId'].split(':')[-1]        sg_rule_sets = event['detail']['responseElements']['securityGroupRuleSet']        for sg_rule in sg_rule_sets['items']:            sg_id = sg_rule['groupId']            from_port = int(sg_rule['fromPort'])            to_port = int(sg_rule['toPort'])            cidr = sg_rule.get('cidrIpv4', sg_rule.get('cidrIpv6', 'N/A'))            send_alert(sg_id, username, to_port, cidr, action)    else:        # Log CloudWatch log events        log_data = event['awslogs']['data']        compressed_payload = base64.b64decode(log_data)        uncompressed_payload = gzip.decompress(compressed_payload)        log_events = json.loads(uncompressed_payload)['logEvents']        for log_event in log_events:            message = log_event['message']            log_entry = json.loads(message)            action = log_entry['eventName']            username = log_entry['userIdentity']['principalId'].split(':')[-1]            sg_rule_sets = log_entry['responseElements']['securityGroupRuleSet']            for sg_rule in sg_rule_sets['items']:                sg_id = sg_rule['groupId']                from_port = int(sg_rule['fromPort'])                to_port = int(sg_rule['toPort'])                cidr = sg_rule.get('cidrIpv4', sg_rule.get('cidrIpv6', 'N/A'))                send_alert(sg_id, username, to_port, cidr, action)def lambda_handler(event, context):    try:        parse_event(event)    except KeyError as e:        print(f"KeyError: {e}")# Example event structure for testingif __name__ == "__main__":    test_event = {        'awslogs': {            'data': base64.b64encode(gzip.compress(json.dumps({                'logEvents': [                    {                        'message': json.dumps({                            'eventName': 'AuthorizeSecurityGroupIngress',                            'userIdentity': {                                'principalId': 'AWS:TestUser',                            },                            'responseElements': {                                'securityGroupRuleSet': {                                    'items': [                                        {                                            'groupId': 'sg-12345678',                                            'fromPort': 22,                                            'toPort': 22,                                            'cidrIpv4': '0.0.0.0/0',                                        }                                    ]                                }                            }                        })                    }                ]            }).encode())).decode()        }    }    lambda_handler(test_event, None)

Set Environment Variable:

  • Set the WEBHOOK_URL environment variable to the Slack webhook URL you created.


Step 4: Configure CloudWatch Logs Filter Pattern

  1. Create Metric Filter:

  • Go to the CloudWatch Console.

  • Select the log group that receives CloudTrail logs.

  • Create a new metric filter with the following pattern:

  • { ($.eventName = AuthorizeSecurityGroupIngress) || ($.eventName = AuthorizeSecurityGroupEgress) || ($.eventName = RevokeSecurityGroupIngress) || ($.eventName = RevokeSecurityGroupEgress) || ($.eventName = CreateSecurityGroup) || ($.eventName = DeleteSecurityGroup) }

  • Provide a name for the filter and configure it.


Step 5: Add CloudWatch Logs Trigger to Lambda

  1. Add Trigger:

  • Go to the Lambda Console.

  • Select your Lambda function.

  • In the “Function overview” section, click “Add trigger”.

  • Select “CloudWatch Logs” as the trigger type.

  • Choose the log group that contains the CloudTrail logs.

  • Specify the filter pattern created earlier.

  • Click “Add”.


Step 6: Test the Setup

  1. Generate Test Events:

Modify a security group rule (e.g., authorize or revoke an ingress or egress rule) to generate test events.

  1. Verify Slack Notifications:

Check your Slack channel to ensure notifications are being received as expected.


You can check more info about: Lambda Function Setup.

6 views0 comments

Comments


bottom of page