0%
Data Engineering sur AWS : Guide Complet

Data Engineering sur AWS : Guide Complet

Découvrez les services AWS pour le traitement et l'analyse de données à grande échelle.

I

InSkillCoach

· min

Data Engineering sur AWS : Guide Complet

Découvrez les services AWS pour le traitement et l’analyse de données à grande échelle.

1. Ingestion de Données avec Amazon Kinesis

Configuration du Stream

# Exemple de configuration Kinesis
import boto3

def setup_kinesis_stream():
    kinesis = boto3.client('kinesis')
    
    # Création du stream
    stream = kinesis.create_stream(
        StreamName='MonStream',
        ShardCount=2
    )
    
    # Attente de la création
    waiter = kinesis.get_waiter('stream_exists')
    waiter.wait(StreamName='MonStream')
    
    return stream['StreamName']

Envoi de Données

# Exemple d'envoi de données vers Kinesis
def send_data_to_kinesis():
    kinesis = boto3.client('kinesis')
    
    data = {
        'timestamp': '2024-03-22T10:00:00',
        'value': 42,
        'type': 'metric'
    }
    
    response = kinesis.put_record(
        StreamName='MonStream',
        Data=json.dumps(data),
        PartitionKey='metric'
    )
    
    return response['ShardId']

2. Traitement avec Amazon EMR

Configuration du Cluster

# Exemple de configuration EMR
def setup_emr_cluster():
    emr = boto3.client('emr')
    
    cluster_config = {
        'Name': 'MonClusterEMR',
        'ReleaseLabel': 'emr-6.10.0',
        'Applications': [
            {'Name': 'Spark'},
            {'Name': 'Hadoop'}
        ],
        'Instances': {
            'InstanceGroups': [
                {
                    'Name': 'Master',
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'MASTER',
                    'InstanceType': 'm5.xlarge',
                    'InstanceCount': 1
                },
                {
                    'Name': 'Core',
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',
                    'InstanceType': 'm5.xlarge',
                    'InstanceCount': 2
                }
            ],
            'KeepJobFlowAliveWhenNoSteps': True,
            'TerminationProtected': False
        }
    }
    
    response = emr.run_job_flow(**cluster_config)
    return response['JobFlowId']

Exécution de Job Spark

# Exemple d'exécution de job Spark
def run_spark_job():
    emr = boto3.client('emr')
    
    step_config = {
        'Name': 'MonJobSpark',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [
                'spark-submit',
                '--class', 'com.example.MainClass',
                's3://mon-bucket/jobs/mon-job.jar'
            ]
        }
    }
    
    response = emr.add_job_flow_steps(
        JobFlowId='j-XXXXX',
        Steps=[step_config]
    )
    
    return response['StepIds']

3. Stockage avec Amazon S3 et Amazon Redshift

Configuration Redshift

# Exemple de configuration Redshift
def setup_redshift_cluster():
    redshift = boto3.client('redshift')
    
    cluster_config = {
        'ClusterIdentifier': 'mon-cluster',
        'NodeType': 'dc2.large',
        'NumberOfNodes': 2,
        'MasterUsername': 'admin',
        'MasterUserPassword': 'password123',
        'DBName': 'mon_database',
        'ClusterType': 'multi-node',
        'VpcSecurityGroupIds': ['sg-xxxxx'],
        'ClusterSubnetGroupName': 'mon-subnet-group'
    }
    
    response = redshift.create_cluster(**cluster_config)
    return response['Cluster']['ClusterIdentifier']

Chargement de Données

# Exemple de chargement de données dans Redshift
def load_data_to_redshift():
    redshift = boto3.client('redshift-data')
    
    query = """
    COPY mon_table
    FROM 's3://mon-bucket/data/'
    IAM_ROLE 'arn:aws:iam::account:role/RedshiftCopyRole'
    CSV
    """
    
    response = redshift.execute_statement(
        ClusterIdentifier='mon-cluster',
        Database='mon_database',
        DbUser='admin',
        Sql=query
    )
    
    return response['Id']

4. Transformation avec AWS Glue

Configuration du Job

# Exemple de configuration Glue
def setup_glue_job():
    glue = boto3.client('glue')
    
    job_config = {
        'Name': 'MonJobGlue',
        'Role': 'arn:aws:iam::account:role/AWSGlueServiceRole',
        'Command': {
            'Name': 'glueetl',
            'ScriptLocation': 's3://mon-bucket/scripts/mon-script.py',
            'PythonVersion': '3'
        },
        'DefaultArguments': {
            '--job-language': 'python',
            '--TempDir': 's3://mon-bucket/temp/',
            '--job-bookmark-option': 'job-bookmark-enable'
        },
        'MaxRetries': 0,
        'Timeout': 2880,
        'MaxCapacity': 10.0
    }
    
    response = glue.create_job(**job_config)
    return response['Name']

Script ETL

# Exemple de script ETL Glue
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

def main():
    # Initialisation du contexte
    glueContext = GlueContext(SparkContext.getOrCreate())
    
    # Création des DynamicFrames
    source_dyf = glueContext.create_dynamic_frame.from_catalog(
        database="mon_database",
        table_name="source_table"
    )
    
    # Transformation des données
    transformed_dyf = source_dyf.map(
        lambda x: {
            'id': x['id'],
            'value': x['value'] * 2,
            'timestamp': x['timestamp']
        }
    )
    
    # Écriture des résultats
    glueContext.write_dynamic_frame.from_options(
        frame=transformed_dyf,
        connection_type="s3",
        connection_options={
            "path": "s3://mon-bucket/output/"
        },
        format="parquet"
    )

if __name__ == "__main__":
    main()

5. Visualisation avec Amazon QuickSight

Configuration du Dataset

# Exemple de configuration QuickSight
def setup_quicksight_dataset():
    quicksight = boto3.client('quicksight')
    
    dataset_config = {
        'AwsAccountId': '123456789012',
        'DataSetId': 'mon-dataset',
        'Name': 'MonDataset',
        'PhysicalTableMap': {
            'Table1': {
                'S3Source': {
                    'DataSourceArn': 'arn:aws:quicksight:region:account:datasource/mon-source',
                    'InputColumns': [
                        {
                            'Name': 'id',
                            'Type': 'STRING'
                        },
                        {
                            'Name': 'value',
                            'Type': 'INTEGER'
                        }
                    ]
                }
            }
        },
        'LogicalTableMap': {
            'Table1': {
                'Alias': 'MonTable',
                'DataTransforms': [],
                'Source': {
                    'PhysicalTableId': 'Table1'
                }
            }
        }
    }
    
    response = quicksight.create_data_set(**dataset_config)
    return response['DataSetId']

Création de Dashboard

# Exemple de création de dashboard QuickSight
def create_quicksight_dashboard():
    quicksight = boto3.client('quicksight')
    
    dashboard_config = {
        'AwsAccountId': '123456789012',
        'DashboardId': 'mon-dashboard',
        'Name': 'MonDashboard',
        'SourceEntity': {
            'SourceTemplate': {
                'DataSetReferences': [
                    {
                        'DataSetPlaceholder': 'MonDataset',
                        'DataSetArn': 'arn:aws:quicksight:region:account:dataset/mon-dataset'
                    }
                ],
                'Arn': 'arn:aws:quicksight:region:account:template/mon-template'
            }
        }
    }
    
    response = quicksight.create_dashboard(**dashboard_config)
    return response['DashboardId']

Conclusion

Points clés à retenir :

  • Ingérer des données en temps réel avec Kinesis
  • Traiter les données avec EMR
  • Stocker et analyser avec Redshift
  • Transformer avec Glue
  • Visualiser avec QuickSight

Recommandations :

  • Choisir les bons services selon les besoins
  • Optimiser les coûts de traitement
  • Mettre en place des pipelines ETL robustes
  • Surveiller les performances
  • Sécuriser les données
InSkillCoach

À propos de InSkillCoach

Expert en formation et technologies

Coach spécialisé dans les technologies avancées et l'IA, porté par GNeurone Inc.

Certifications:

  • AWS Certified Solutions Architect – Professional
  • Certifications Google Cloud
  • Microsoft Certified: DevOps Engineer Expert
  • Certified Kubernetes Administrator (CKA)
  • CompTIA Security+
630
101

Commentaires

Les commentaires sont alimentés par GitHub Discussions

Connectez-vous avec GitHub pour participer à la discussion

Lien copié !