Data Engineering sur AWS : Guide Complet
Découvrez les services AWS pour le traitement et l'analyse de données à grande échelle.
I
InSkillCoach
· min
Data Engineering sur AWS : Guide Complet
Découvrez les services AWS pour le traitement et l’analyse de données à grande échelle.
1. Ingestion de Données avec Amazon Kinesis
Configuration du Stream
# Exemple de configuration Kinesis
import boto3
def setup_kinesis_stream():
kinesis = boto3.client('kinesis')
# Création du stream
stream = kinesis.create_stream(
StreamName='MonStream',
ShardCount=2
)
# Attente de la création
waiter = kinesis.get_waiter('stream_exists')
waiter.wait(StreamName='MonStream')
return stream['StreamName']
Envoi de Données
# Exemple d'envoi de données vers Kinesis
def send_data_to_kinesis():
kinesis = boto3.client('kinesis')
data = {
'timestamp': '2024-03-22T10:00:00',
'value': 42,
'type': 'metric'
}
response = kinesis.put_record(
StreamName='MonStream',
Data=json.dumps(data),
PartitionKey='metric'
)
return response['ShardId']
2. Traitement avec Amazon EMR
Configuration du Cluster
# Exemple de configuration EMR
def setup_emr_cluster():
emr = boto3.client('emr')
cluster_config = {
'Name': 'MonClusterEMR',
'ReleaseLabel': 'emr-6.10.0',
'Applications': [
{'Name': 'Spark'},
{'Name': 'Hadoop'}
],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
},
{
'Name': 'Core',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 2
}
],
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False
}
}
response = emr.run_job_flow(**cluster_config)
return response['JobFlowId']
Exécution de Job Spark
# Exemple d'exécution de job Spark
def run_spark_job():
emr = boto3.client('emr')
step_config = {
'Name': 'MonJobSpark',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--class', 'com.example.MainClass',
's3://mon-bucket/jobs/mon-job.jar'
]
}
}
response = emr.add_job_flow_steps(
JobFlowId='j-XXXXX',
Steps=[step_config]
)
return response['StepIds']
3. Stockage avec Amazon S3 et Amazon Redshift
Configuration Redshift
# Exemple de configuration Redshift
def setup_redshift_cluster():
redshift = boto3.client('redshift')
cluster_config = {
'ClusterIdentifier': 'mon-cluster',
'NodeType': 'dc2.large',
'NumberOfNodes': 2,
'MasterUsername': 'admin',
'MasterUserPassword': 'password123',
'DBName': 'mon_database',
'ClusterType': 'multi-node',
'VpcSecurityGroupIds': ['sg-xxxxx'],
'ClusterSubnetGroupName': 'mon-subnet-group'
}
response = redshift.create_cluster(**cluster_config)
return response['Cluster']['ClusterIdentifier']
Chargement de Données
# Exemple de chargement de données dans Redshift
def load_data_to_redshift():
redshift = boto3.client('redshift-data')
query = """
COPY mon_table
FROM 's3://mon-bucket/data/'
IAM_ROLE 'arn:aws:iam::account:role/RedshiftCopyRole'
CSV
"""
response = redshift.execute_statement(
ClusterIdentifier='mon-cluster',
Database='mon_database',
DbUser='admin',
Sql=query
)
return response['Id']
4. Transformation avec AWS Glue
Configuration du Job
# Exemple de configuration Glue
def setup_glue_job():
glue = boto3.client('glue')
job_config = {
'Name': 'MonJobGlue',
'Role': 'arn:aws:iam::account:role/AWSGlueServiceRole',
'Command': {
'Name': 'glueetl',
'ScriptLocation': 's3://mon-bucket/scripts/mon-script.py',
'PythonVersion': '3'
},
'DefaultArguments': {
'--job-language': 'python',
'--TempDir': 's3://mon-bucket/temp/',
'--job-bookmark-option': 'job-bookmark-enable'
},
'MaxRetries': 0,
'Timeout': 2880,
'MaxCapacity': 10.0
}
response = glue.create_job(**job_config)
return response['Name']
Script ETL
# Exemple de script ETL Glue
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
def main():
# Initialisation du contexte
glueContext = GlueContext(SparkContext.getOrCreate())
# Création des DynamicFrames
source_dyf = glueContext.create_dynamic_frame.from_catalog(
database="mon_database",
table_name="source_table"
)
# Transformation des données
transformed_dyf = source_dyf.map(
lambda x: {
'id': x['id'],
'value': x['value'] * 2,
'timestamp': x['timestamp']
}
)
# Écriture des résultats
glueContext.write_dynamic_frame.from_options(
frame=transformed_dyf,
connection_type="s3",
connection_options={
"path": "s3://mon-bucket/output/"
},
format="parquet"
)
if __name__ == "__main__":
main()
5. Visualisation avec Amazon QuickSight
Configuration du Dataset
# Exemple de configuration QuickSight
def setup_quicksight_dataset():
quicksight = boto3.client('quicksight')
dataset_config = {
'AwsAccountId': '123456789012',
'DataSetId': 'mon-dataset',
'Name': 'MonDataset',
'PhysicalTableMap': {
'Table1': {
'S3Source': {
'DataSourceArn': 'arn:aws:quicksight:region:account:datasource/mon-source',
'InputColumns': [
{
'Name': 'id',
'Type': 'STRING'
},
{
'Name': 'value',
'Type': 'INTEGER'
}
]
}
}
},
'LogicalTableMap': {
'Table1': {
'Alias': 'MonTable',
'DataTransforms': [],
'Source': {
'PhysicalTableId': 'Table1'
}
}
}
}
response = quicksight.create_data_set(**dataset_config)
return response['DataSetId']
Création de Dashboard
# Exemple de création de dashboard QuickSight
def create_quicksight_dashboard():
quicksight = boto3.client('quicksight')
dashboard_config = {
'AwsAccountId': '123456789012',
'DashboardId': 'mon-dashboard',
'Name': 'MonDashboard',
'SourceEntity': {
'SourceTemplate': {
'DataSetReferences': [
{
'DataSetPlaceholder': 'MonDataset',
'DataSetArn': 'arn:aws:quicksight:region:account:dataset/mon-dataset'
}
],
'Arn': 'arn:aws:quicksight:region:account:template/mon-template'
}
}
}
response = quicksight.create_dashboard(**dashboard_config)
return response['DashboardId']
Conclusion
Points clés à retenir :
- Ingérer des données en temps réel avec Kinesis
- Traiter les données avec EMR
- Stocker et analyser avec Redshift
- Transformer avec Glue
- Visualiser avec QuickSight
Recommandations :
- Choisir les bons services selon les besoins
- Optimiser les coûts de traitement
- Mettre en place des pipelines ETL robustes
- Surveiller les performances
- Sécuriser les données
À propos de InSkillCoach
Expert en formation et technologies
Coach spécialisé dans les technologies avancées et l'IA, porté par GNeurone Inc.
Certifications:
- AWS Certified Solutions Architect – Professional
- Certifications Google Cloud
- Microsoft Certified: DevOps Engineer Expert
- Certified Kubernetes Administrator (CKA)
- CompTIA Security+
630
101
Commentaires
Les commentaires sont alimentés par GitHub Discussions
Connectez-vous avec GitHub pour participer à la discussion