By using this site, you agree to the Privacy Policy and Terms of Use.
Accept
World of SoftwareWorld of SoftwareWorld of Software
  • News
  • Software
  • Mobile
  • Computing
  • Gaming
  • Videos
  • More
    • Gadget
    • Web Stories
    • Trending
    • Press Release
Search
  • Privacy
  • Terms
  • Advertise
  • Contact
Copyright © All Rights Reserved. World of Software.
Reading: How to Build a Scalable Data Mesh in AWS with Lake Formation | HackerNoon
Share
Sign In
Notification Show More
Font ResizerAa
World of SoftwareWorld of Software
Font ResizerAa
  • Software
  • Mobile
  • Computing
  • Gadget
  • Gaming
  • Videos
Search
  • News
  • Software
  • Mobile
  • Computing
  • Gaming
  • Videos
  • More
    • Gadget
    • Web Stories
    • Trending
    • Press Release
Have an existing account? Sign In
Follow US
  • Privacy
  • Terms
  • Advertise
  • Contact
Copyright © All Rights Reserved. World of Software.
World of Software > Computing > How to Build a Scalable Data Mesh in AWS with Lake Formation | HackerNoon
Computing

How to Build a Scalable Data Mesh in AWS with Lake Formation | HackerNoon

News Room
Last updated: 2025/02/17 at 1:05 PM
News Room Published 17 February 2025
Share
SHARE

In today’s world, where data is power, traditional centralized designs often become chokepoints that impede both access to data and innovation. Data Mesh is a modern approach that decentralizes ownership and treats data as a product managed by domain teams. AWS Lake Formation (LF) simplifies secure, cross-account sharing and governance, including all three: control of what users can do across accounts, and keeping track of compliance with regulations and standards. Leveraging Lake Formation, AWS Lambda, SQS, IAM, and S3 an organization can now implement a truly scalable Data Mesh architecture fostering self-serve analytics interoperability and federated governance without compromising security.

Architecture Overview

 Data Mesh using Lake Formation  Data Mesh using Lake Formation

The architecture follows a Data Mesh design that is concerned with two AWS accounts: Account A (Producer) and Account B (Consumer). The objective is to transfer AWS Glue Data Catalog tables from Account A to Account B in a secure manner with the help of services like AWS Lake Formation (LF), S3, Lambda, and Amazon SQS.

Intake Process and Manifest File

Here, a manifest.json file is the single most important system configuration file and makes it clear who has access to what, such as role, database name, account ID, and permissions granted to them. Within our firm, the service intake procedure is run using ServiceNow. A requester raises a ServiceNow (SNOW) ticket with each relevant piece of information arranged within structured forms. The resulting manifest.json file is then generated and placed into an S3 bucket located in Account A after the ticket is approved within our backend systems.

Data Sharing Process

  1. Producer Lambda in Account A
    • An event in AWS Lambda (producer.py) is scheduled to occur whenever the manifest.json file is dropped in Account A S3 bucket.

    • The producer Lambda checks the validity of the request and verifies the following:

      • If the request is for same-account or cross-account access.
      • Whether the S3 bucket is registered in AWS Lake Formation (LF).
    • Once validation is complete, the producer Lambda sends a message to an Amazon SQS queue in Account B.

    • This message includes details about AWS Resource Access Manager (RAM), which facilitates cross-account resource sharing.

  2. Consumer Lambda in Account B
    • Upon receiving the SQS message, a consumer Lambda function (consumer.py) in Account B is triggered.
    • It processes the request and grants the necessary permissions in Lake Formation for Account B’s role to access the shared Glue Data Catalog tables from Account A.
    • Once access is granted, a corresponding database/table entry will be created in Account B by the consumer lambda for users to query against this shared data.

This provides automated, scalable architecture for secure, governed, and efficient sharing of data among AWS accounts while allowing compliance utilizing AWS Lake Formation, IAM roles, and AWS Glue Data Catalog.

Below are the provided sample JSON, Producer Code, and Consumer Code which can be used to create and run the above architecture.

Manifest.JSON

{
    "Records": [{
        "AccessType": "grant",
        "Principal": "arn of IAM user/role in account a (if granting same account)",

        "Table": {
            "DatabaseName": "database name",
            "Name": "table name",
            "Wildcard":false
          },

        "Permissions": ["SELECT"],
        "Cross_account": true,
        "AccountID": "112233445566",
        "Cross_role":"arn of cross account IAM user/role (if granting cross account)"
    }]
}

Producer Code – producer.py

import boto3
import json
import logging
import os
from botocore.exceptions import ClientError

# Logger setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Environment Variables
ADMIN_ROLE_ARN = os.environ['ADMIN_ROLE_ARN']
LF_REGISTER_ROLE = os.environ['LF_ROLE']
QUEUE_URL = os.environ['CROSS_QUEUE_URL']
S3_POLICY_ARN = os.environ['S3_POLICY_ARN']
S3_POLICY_NAME = os.environ['S3_POLICY_NAME']


def get_ram_invite(ram_client):
    """Retrieve Resource Access Manager (RAM) invite for cross-account sharing."""
    try:
        response = ram_client.get_resource_share_associations(
            associationType='PRINCIPAL', associationStatus='ASSOCIATING'
        )
        return response['resourceShareAssociations'][0]['resourceShareArn']
    except Exception as e:
        logger.error(f"Error retrieving RAM invite: {e}")
        raise


def delete_oldest_policy_version(iam_client, policy_arn):
    """Deletes the oldest version of an IAM policy (max 5 versions allowed)."""
    try:
        versions = iam_client.list_policy_versions(PolicyArn=policy_arn)['Versions']
        non_default_versions = [v['VersionId'] for v in versions if not v['IsDefaultVersion']]

        if non_default_versions:
            oldest_version = min(non_default_versions)
            iam_client.delete_policy_version(PolicyArn=policy_arn, VersionId=oldest_version)
    except Exception as e:
        logger.error(f"Error deleting old policy version: {e}")
        raise


def update_lf_s3_policy(iam_client, iam_resource, bucket_name):
    """Modifies the Lake Formation IAM policy to include S3 paths."""
    try:
        account_id = boto3.client('sts').get_caller_identity()['Account']
        policy_arn = f'arn:aws:iam::{account_id}:policy/{S3_POLICY_NAME}'
        policy = iam_resource.Policy(policy_arn)
        policy_json = policy.default_version.document

        s3_arn = f'arn:aws:s3:::{bucket_name}'
        updated = False

        for statement in policy_json['Statement']:
            if s3_arn not in statement['Resource']:
                statement['Resource'].append(f'{s3_arn}/*')
                updated = True

        if updated:
            delete_oldest_policy_version(iam_client, S3_POLICY_ARN)
            iam_client.create_policy_version(
                PolicyArn=policy_arn,
                PolicyDocument=json.dumps(policy_json),
                SetAsDefault=True
            )
    except Exception as e:
        logger.error(f"Error updating LF S3 policy: {e}")
        raise


def register_s3_location(iam_client, s3_client, glue_client, lf_client, database, table, iam_resource, is_table=True):
    """Registers an S3 location with Lake Formation."""
    try:
        s3_location = glue_client.get_table(DatabaseName=database, Name=table)['Table']['StorageDescriptor']['Location'] if is_table else 
                      glue_client.get_database(Name=database)['Database']['LocationUri']

        bucket_name = s3_location.split('/')[2]
        registered_buckets = [res['ResourceArn'].split(':::')[1] for res in lf_client.list_resources()['ResourceInfoList']]

        if bucket_name not in registered_buckets:
            lf_client.register_resource(
                ResourceArn=f'arn:aws:s3:::{bucket_name}',
                UseServiceLinkedRole=False,
                RoleArn=LF_REGISTER_ROLE
            )
            update_lf_s3_policy(iam_client, iam_resource, bucket_name)
    except ClientError as e:
        logger.error(f"Error registering S3 location: {e}")
        raise


def grant_data_location_permissions(lf_client, glue_client, principal, database, table, is_table=True):
    """Grants Data Location Permissions to a principal."""
    try:
        s3_location = glue_client.get_table(DatabaseName=database, Name=table)['Table']['StorageDescriptor']['Location'] if is_table else 
                      glue_client.get_database(Name=database)['Database']['LocationUri']

        bucket_name = s3_location.split('/')[2]

        lf_client.grant_permissions(
            Principal={'DataLakePrincipalIdentifier': principal},
            Resource={'DataLocation': {'ResourceArn': f'arn:aws:s3:::{bucket_name}'}},
            Permissions=['DATA_LOCATION_ACCESS'],
            PermissionsWithGrantOption=['DATA_LOCATION_ACCESS']
        )
    except ClientError as e:
        logger.error(f"Error granting Data Location Permissions: {e}")


def create_resource(database, table=None, wildcard=False):
    """Creates a resource dictionary for granting permissions."""
    if database and table:
        return {'Table': {'DatabaseName': database, 'Name': table}}
    elif database and wildcard:
        return {'Table': {'DatabaseName': database, 'TableWildcard': {}}}
    elif database:
        return {'Database': {'Name': database}}
    return None


def revoke_permission(lf_client, principal, permissions, database, table, wildcard):
    """Revokes permissions from a principal."""
    try:
        resource = create_resource(database, table, wildcard)
        lf_client.revoke_permissions(
            Principal={'DataLakePrincipalIdentifier': principal},
            Resource=resource,
            Permissions=permissions
        )
    except Exception as e:
        logger.error(f"Error revoking permissions for {principal}: {e}")
        raise


def lambda_handler(event, context):
    """Lambda function to process S3 event and manage Lake Formation permissions."""
    try:
        sts_client = boto3.client('sts')
        assume_role_response = sts_client.assume_role(
            RoleArn=ADMIN_ROLE_ARN,
            RoleSessionName='LFSession'
        )

        aws_session = boto3.session.Session(
            aws_access_key_id=assume_role_response['Credentials']['AccessKeyId'],
            aws_secret_access_key=assume_role_response['Credentials']['SecretAccessKey'],
            aws_session_token=assume_role_response['Credentials']['SessionToken']
        )

        s3_client = aws_session.client("s3")
        bucket_name = event['Records'][0]['s3']['bucket']['name']
        file_key = event['Records'][0]['s3']['object']['key']
        obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
        json_content = json.loads(obj["Body"].read().decode('utf-8'))

        # Extracting manifest file details
        record = json_content['Records'][0]
        access_type = record['AccessType']
        principal = record['Principal']
        database = record['Table']['DatabaseName']
        table = record['Table']['Name']
        permissions = record['Permissions']
        cross_account = record['Cross_account']

        glue_client = aws_session.client('glue')
        lf_client = aws_session.client('lakeformation')
        iam_client = aws_session.client('iam')
        iam_resource = aws_session.resource('iam')

        if access_type == 'revoke':
            revoke_permission(lf_client, principal, permissions, database, table, wildcard=False)
        else:
            register_s3_location(iam_client, s3_client, glue_client, lf_client, database, table, iam_resource)
            grant_data_location_permissions(lf_client, glue_client, principal, database, table)

    except Exception as e:
        logger.error(f"Lambda execution error: {e}")
        raise

Consumer Code – consumer.py

import boto3
import json
import logging
import os
from botocore.exceptions import ClientError

# Logger setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Environment Variables
ACCOUNT_A = os.environ['SRC_ACC_NUM']
ADMIN_ROLE_ARN = os.environ['ADMIN_ROLE_ARN']


def assume_role(role_arn):
    """Assume AWS IAM Role and return a session with temporary credentials."""
    sts_client = boto3.client('sts')
    try:
        response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="LFSession")
        return boto3.session.Session(
            aws_access_key_id=response['Credentials']['AccessKeyId'],
            aws_secret_access_key=response['Credentials']['SecretAccessKey'],
            aws_session_token=response['Credentials']['SessionToken']
        )
    except ClientError as e:
        logger.error(f"Error assuming role {role_arn}: {e}")
        raise


def get_ram_invite(ram_client, ram_arn):
    """Retrieve a Resource Access Manager (RAM) invitation."""
    try:
        response = ram_client.get_resource_share_invitations(resourceShareArns=[ram_arn])
        return response['resourceShareInvitations'][0]['resourceShareInvitationArn']
    except ClientError as e:
        logger.error(f"Error retrieving RAM invite: {e}")
        raise


def accept_ram_invite(ram_client, ram_invite):
    """Accept a RAM invitation."""
    try:
        ram_client.accept_resource_share_invitation(resourceShareInvitationArn=ram_invite)
    except ClientError:
        logger.info("RAM invite already accepted")


def create_database(glue_client, database_name):
    """Create a Glue database if it does not already exist."""
    try:
        glue_client.create_database(DatabaseInput={'Name': database_name})
        logger.info(f"Created database: {database_name}")
    except ClientError:
        logger.info(f"Database {database_name} already exists")


def create_resource_link_database(glue_client, rl_name, source_db, account_id):
    """Create a resource link for a shared Glue database."""
    try:
        glue_client.create_database(DatabaseInput={
            'Name': rl_name,
            "TargetDatabase": {
                "CatalogId": account_id,
                "DatabaseName": source_db
            }
        })
        logger.info(f"Created resource link database: {rl_name}")
    except ClientError:
        logger.info(f"Resource link {rl_name} already exists")


def create_resource_link_table(glue_client, rl_db, rl_table, source_db, source_table, account_id):
    """Create a resource link for a shared Glue table."""
    try:
        glue_client.create_table(
            DatabaseName=rl_db,
            TableInput={
                "Name": rl_table,
                "TargetTable": {
                    "CatalogId": account_id,
                    "DatabaseName": source_db,
                    "Name": source_table
                }
            }
        )
        logger.info(f"Created resource link table: {rl_table}")
    except ClientError:
        logger.info(f"Resource link table {rl_table} already exists")


def grant_permissions(lf_client, principal, resource, permissions):
    """Grant permissions to a principal on a specified resource."""
    try:
        lf_client.grant_permissions(
            Principal={"DataLakePrincipalIdentifier": principal},
            Resource=resource,
            Permissions=permissions,
            PermissionsWithGrantOption=permissions
        )
    except ClientError as e:
        logger.error(f"Error granting permissions to {principal}: {e}")
        raise


def revoke_permissions(lf_client, principal, resource, permissions):
    """Revoke permissions from a principal."""
    try:
        lf_client.revoke_permissions(
            Principal={"DataLakePrincipalIdentifier": principal},
            Resource=resource,
            Permissions=permissions
        )
    except ClientError as e:
        logger.error(f"Error revoking permissions from {principal}: {e}")
        raise


def construct_resource(database, table=None, wildcard=False, catalog_id=None):
    """Construct the resource dictionary for permissions."""
    if table:
        return {"Table": {"DatabaseName": database, "Name": table, **({"CatalogId": catalog_id} if catalog_id else {})}}
    elif wildcard:
        return {"Table": {"DatabaseName": database, "TableWildcard": {}}}
    else:
        return {"Database": {"Name": database}}


def lambda_handler(event, context):
    """Lambda function to process SQS messages and manage Lake Formation permissions."""
    try:
        records = [json.loads(record["body"]) for record in event['Records']]
    except (json.JSONDecodeError, KeyError) as e:
        logger.error(f"Error processing event data: {e}")
        return

    aws_session = assume_role(ADMIN_ROLE_ARN)

    # AWS Clients
    lf_client = aws_session.client('lakeformation')
    glue_client = aws_session.client('glue')
    ram_client = aws_session.client('ram')

    for record in records:
        ram_arn = record.get('ram_url')
        principal = record.get('cross_role')
        database = record.get('db_name')
        table = record.get('table_name')
        permissions = record.get('permissions', [])
        wildcard = record.get('wild_card', False)
        access_type = record.get('access_type')

        rl_database = f'rl_{database}'
        db_target = f'{database}_shared'
        table_target = f'rl_{table}'

        if access_type == 'grant':
            try:
                ram_invite = get_ram_invite(ram_client, ram_arn)
                accept_ram_invite(ram_client, ram_invite)
            except Exception as e:
                logger.error(f"Error accepting RAM invite: {e}")

        # Handle Database/Table Creation
        if database and table:
            create_database(glue_client, db_target)
            create_resource_link_table(glue_client, db_target, table_target, database, table, ACCOUNT_A)
        elif database:
            create_resource_link_database(glue_client, rl_database, database, ACCOUNT_A)

        # Handle Permissions
        try:
            resource_db = construct_resource(db_target)
            resource_table = construct_resource(db_target, table_target)

            if access_type == 'grant':
                if database and table:
                    grant_permissions(lf_client, principal, resource_db, ['ALL'])
                    grant_permissions(lf_client, principal, resource_table, permissions)
                elif database:
                    resource_wildcard = construct_resource(database, wildcard=True)
                    grant_permissions(lf_client, principal, resource_wildcard, permissions)
            else:
                if database and table:
                    revoke_permissions(lf_client, principal, resource_db, ['ALL'])
                    revoke_permissions(lf_client, principal, resource_table, permissions)
                elif database:
                    resource_wildcard = construct_resource(rl_database, wildcard=True)
                    revoke_permissions(lf_client, principal, resource_wildcard, permissions)
        except Exception as e:
            logger.error(f"Error modifying permissions: {e}")

Runtime Notes:

For producer script:

  1. Create Lambda Function and upload the producer.py code
  2. Add an environment variable called ADMIN_ROLE_ARN and add the Data Lake Admin role ARN as the value
  3. Add an environment variable called CROSS_QUEUE_URL and add the consumer queue URL as the value
  4. Add an environment variable called LF_ROLE and add Lake Formation Service Role arn for Account A
  5. Add an environment variable called S3_POLICY_ARN and add s3 custom policy arn as the value

For consumer script:

  1. Create an AWS Lambda Function and upload the consumer.py code
  2. Add an environment variable called SRC_ACC_NUM and provide the source AWS account number as the value
  3. Add an environment variable called ADMIN_ROLE_ARN and add the Data Lake Admin Role arn as the value

Conclusion

Using AWS’s Lake Formation, Glue Data Catalog, IAM, and S3 to put a Data Mesh into action gives you a way to spread out data ownership that’s both flexible and safe, all while keeping a close eye on things. With the help of Lambda, SQS, and AWS Resource Access Manager (RAM), sharing data across different accounts can be automated, making it simpler for organizations to control access and let different teams handle their own data products without a hitch. This setup lets people do their own data analysis while still following the rules for compliance and security. As the world of data keeps changing, embracing a method like this, which is both unified and well-regulated, can make data easier to get to, boost teamwork, and lead to better decisions throughout a company.

References

Sign Up For Daily Newsletter

Be keep up! Get the latest breaking news delivered straight to your inbox.
By signing up, you agree to our Terms of Use and acknowledge the data practices in our Privacy Policy. You may unsubscribe at any time.
Share This Article
Facebook Twitter Email Print
Share
What do you think?
Love0
Sad0
Happy0
Sleepy0
Angry0
Dead0
Wink0
Previous Article Scientists figured out why we always crave dessert, even when we’re full
Next Article https://news.google.com/read/CBMikAFBVV95cUxNelJPZ3hzZlpOQnRpdTBGMC1FU21GWGRuVlZFUVlaUnN1d0M4RTUxcW5CZEVWSHNCS3NqQlA0N1NaRFgzYzRBa0tkTXFOSldtcEpFb2dtSGRCWlljVFZ6U0xxMFhpOHFMMGU3TGZDS1JHR1VTQVpCck5rQW8tb203WmlOMmFmS1VORWVjSG5qdUPSAZYBQVVfeXFMTmhtYjVJQkpDb1hMX2ZkR29zVzM1TGtZSlRUZTRONVNhMHRFN0Z1WmhJTFJiSUpuQ2VEcFJSNGVmRlhQTWo2Y0RLVHhJLU5oc3NoWmR3bW9NOGFVcDlRM01LR0tIbnZXU3NxcXBRTkFmaUtlc3VKTWNFVUpkQ0l1RHA2alhyTWNFSERVZkY5elQtQWozcEhB?hl=en-GB&gl=GB&ceid=GB%3Aen
Leave a comment

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Stay Connected

248.1k Like
69.1k Follow
134k Pin
54.3k Follow

Latest News

Meta Quest 3 vs. Meta Quest 3S: What’s the Difference?
News
Without a long-term high-performance computing plan, The US Cold Lose Its Innovation Lead
Software
Violent Threats Against US Judges Are Skyrocketing Online
Gadget
How to Create Copilot Prompts with 10 Examples |
Computing

You Might also Like

Computing

How to Create Copilot Prompts with 10 Examples |

23 Min Read
Computing

This Is How JP Morgan Trades with AI | HackerNoon

13 Min Read
Computing

Xinbi Telegram Market Tied to $8.4B in Crypto Crime, Romance Scams, North Korea Laundering

4 Min Read
Computing

Top Chinese smartphone brand suspends its foldable phone line: report · TechNode

1 Min Read
//

World of Software is your one-stop website for the latest tech news and updates, follow us now to get the news that matters to you.

Quick Link

  • Privacy Policy
  • Terms of use
  • Advertise
  • Contact

Topics

  • Computing
  • Software
  • Press Release
  • Trending

Sign Up for Our Newsletter

Subscribe to our newsletter to get our newest articles instantly!

World of SoftwareWorld of Software
Follow US
Copyright © All Rights Reserved. World of Software.
Welcome Back!

Sign in to your account

Lost your password?