۱٫ MapReduce چیست؟
MapReduce یک الگوی برنامهنویسی برای پردازش کلان داده است که در آن دادهها به تکههای توزیع شده تقسیم میشوند و توسط یک سری تبدیل پردازش میشوند.
پارادایم برنامه نویسی MapReduce داده ها را در ۲ عملیات پردازش می کند: map() و سپس reduce(). map() یک تابع تعریف شده توسط کاربر است که هر رکورد داده را در مجموعه داده نگاشت می کند. ()reduce خروجی map() را با یک تابع تعریف شده توسط کاربر دیگر گروه بندی می کند.
۲٫ MapReduce Pipeline
MapReduce روی جفتهای (کلید، مقدار) با انجام مراحل زیر کار میکند:
- INPUT : فهرست جفتهای کلید-مقدار (k1، v1)
- نقشه : (k1, v1) >> [list of (k2, v2)]
- SHUFFLE : ترکیب (k2، v2) >> (k2، [لیست v2])
- REDUCE : (k2، [لیست v2]) >> (k3، v3)
- خروجی : لیست (k3، v3)
برای اینکه همه چیز را در چشم انداز قرار دهیم، اجازه دهید این مراحل را با یک مثال ساده از WordCount طی کنیم. فرض کنید سندی داریم که شامل دو خط است:
the article is about the mapreduce paradigmmapreduce is part of big data ecosystem
ما می خواهیم فرکانس هر کلمه در این سند را محاسبه کنیم. ما این مشکل را از پایین به بالا با شناسایی آنچه که به عنوان خروجی نهایی می خواهیم حل خواهیم کرد.
ورودی: (سند: خط متن)
خروجی: (کلمه منحصر به فرد، فرکانس)
خروجی نهایی ما لیستی از جفت (کلمه منحصر به فرد، فرکانس) است، و برای رسیدن به آن به یک خروجی میانی از هر کلمه در سند و تعداد ۱ – (کلمه، ۱) نیاز داریم. داشتن لیستی از (word, 1) به ما این امکان را می دهد که با کلمات ترکیب کنیم و تمام ۱ های یک کلمه را با هم اضافه کنیم.
سپس، SHUFFLE روی خروجی میانی ۱ انجام میشود. تمام جفتهای (کلمه، ۱) را بر اساس کلمات گروهبندی میکند و خروجی میانی ۲ را تولید میکند:
در نهایت از یک Reducer برای جمع کردن تمام ۱ ها با هم استفاده می شود که خروجی نهایی را به ما می دهد:
۳٫ برنامه MapReduce در پایتون - معرفی mrjob
mrjob کتابخانه ای است که به شما امکان می دهد برنامه های پایتون را که روی Hadoop اجرا می شوند بنویسید. با mrjob می توانید کد خود را به صورت محلی و بدون نصب Hadoop آزمایش کنید یا آن را در یک کلاستر مورد نظر خود اجرا کنید. “اگر نمی خواهید یک متخصص Hadoop باشید اما به قدرت محاسباتی MapReduce نیاز دارید، mrjob ممکن است برای شما مناسب باشد.”
ابتدا mrjob را با اجرای:
pip install mrjob
سپس MRJob را وارد کنید
from mrjob.job import MRJob
پس از آن می توانید یک کار بنویسید که توسط یک کلاس تعریف شده است و شامل یک سری مراحل است که شامل نقشه کش ها، ترکیب کننده ها و یک کاهنده است. نقشهبردارها، ترکیبکنندهها و کاهندهها اختیاری هستند، اگرچه باید حداقل یکی داشته باشید.
- مثال ۱ : با فرض اینکه یک فایل متنی به نام book.txt داریم . وظیفه ما شمارش فراوانی کلمات در ۱۰۰۰ سطر اول این سند است.
ابتدا یک کلاس به نام WordCount ایجاد می کنیم، به یاد داشته باشید که “MRJob” را در آرگومان قرار دهید. سپس یک تابع mapper و یک تابع کاهنده در داخل کلاس ایجاد می کنیم
class WordCount(MRJob): def mapper(self, key_1, value_1):
yield (key_2, value_2) def reducer(self, key_2, value_2):
yield (key_3, value_3)
در داخل تابع mapper، جفت های ورودی (کلید، مقدار) ما (_، خط) و جفت های خروجی ما باید (word، ۱) باشند. توجه داشته باشید که (_، خط) به MRJob می گوید که کلید را نادیده بگیرد و هر خط از سند را به عنوان مقدار در نظر بگیرد.
def mapper(self, _, line):
برای کلمه در خط:
if length(word) > 0:
yield (word, 1)
در داخل تابع کاهنده، جفت های ورودی (کلید، مقدار) ما (کلمه، تعداد) و جفت های خروجی ما باید (کلمه، مجموع (شمار)) باشند.
def reducer(self, word, count):
yield (word, sum(count))
سپس این کلاس کامل می شود. در نهایت فقط باید این کلاس را به یک وظیفه اختصاص دهیم.
task0 = تعداد کلمات (args = [])
برای چاپ خروجی، فایل txt را به صورت استریم میخوانیم، به این معنی که آن را سطر به ردیف میخوانیم و task0 را در هر ردیف انجام میدهیم.
class WordCount(MRJob): | |
#create a mapper() | |
def mapper(self, _, line): | |
for word in line.strip().split(‘ ‘): | |
if len(word) > 0: | |
yield (word, 1) | |
#create a reducer() | |
def reducer(self, word, count): | |
yield (word, sum(count)) | |
task0 = WordCount(args = []) | |
with open(‘book.txt’, ‘r’) as fi: | |
row = [(i,line.strip()) for i,line in enumerate(fi) if i<1000] #fetch only the first 10000 rows | |
output = list(mr.runJob(row, task0)) | |
output = sorted(output, key = lambda x: –x[1]) # sort the ouput in descending order |
۲ : ما یک فایل “citibike.csv” داریم که حاوی داده های سفرهای citibike در نیویورک است. هر ردیف از داده ها نشان دهنده سفر منحصر به فرد دوچرخه سوار از ایستگاه A به ایستگاه B است. ستون هایی که برای این کار نیاز داریم عبارتند از “start_station_name”، “end_station_name” و “gender”. هر ردیف از داده ها نشان دهنده یک سفر منحصر به فرد است.
اکنون از ما خواسته میشود ایستگاههایی را محاسبه کنیم که بیشترین سواری از آن شروع شده است، به ازای هر جنسیت. به این معنی که نام ایستگاه با بیشترین تعداد پیکاپ دوچرخه برای سوارکاران زن، برای سواران مرد و برای سواران ناشناس چه بوده است.
خروجی لیستی از تاپل ها خواهد بود که هر کدام شامل یک برچسب جنسیت (همانطور که در زیر نشان داده شده است) و یک تاپل دیگر شامل نام ایستگاه و تعداد کل سفرهای شروع شده در آن ایستگاه برای آن جنسیت است. یعنی: (۰: (ایستگاه_A، ۱۰۰))
نگاشت برچسب برای ستون جنسیت در citibike.csv این است: (‘۰’= ناشناخته ؛ ‘۱’ = مذکر ؛ ‘۲’ = زن ).
مانند قبل، باید از پایین به بالا فکر کنیم. خروجی مورد نظر ما در پایان یک تاپل تودرتو است.
#desired output
(gender_label, (max_start_station_name, max_number_of_trips))
که در آن gender_label کلید و (start_station_name, max_number_of_trips) مقدار است. این را می توان با استفاده از یک کاهنده برای استخراج حداکثر مقدار از همه مقادیر در صورتی که یک جفت کلید-مقدار متوسط داشته باشیم به دست آورد:
#intermediate output1
(gender_label, (start_station_name, number_of_trips_for_station))
در خروجی میانی ۱، gender_label را به عنوان کلید و (start_station_name، number_of_trips_for_station) را به عنوان مقدار داریم. این بدان معناست که برای هر جنسیت، تعداد پیکاپهای هر ایستگاه را شمارش میکنیم که به این صورت خواهد بود: (اعداد ساخته شده است)
(۰, (station_A, 4))
(0, (station_B, 1))
(0, (station_C, 210))
(1, (station_A, 1))
(1, (station_B, 378))
(1, (station_C, 3))
(2, (station_A, 5))
(2, (station_B, 4))
(2, (station_C, 0))
این را می توان با تغییر کلید از gender_label به جفت ( جنسیت، ایستگاه) در خروجی میانی ۲ بازنویسی کرد (نقشه برداری کرد) .
# intermediate output2
((gender_label, start_station_name), number_of_trips_for_station))
اکنون ما چیزی برای رسیدن به آسانی داریم. ما فقط میتوانیم یک رکورد از gender_label و start_station_name هر ردیف را با تعداد ۱ نگهداریم و کل تعداد را با کلید ( gender_label، start_station_name) مانند زیر جمعبندی کنیم:
این را می توان به راحتی با استفاده از یک کاهنده در رکورد ما از gender_label و start_station_name هر ردیف با تعداد ۱ انجام داد. این خروجی میانی ما ۳ خواهد بود:
# intermediate output3
((0, station_A), 1))
((0, station_A), 1))
((0, station_A), 1))
((0, station_C), 1))
((1, station_B), 1))
((0, station_B), 1))
((0, station_C), 1))
((2, station_A), 1))
((1, station_B), 1))
((1, station_C), 1))
…
برای کدنویسی این مورد در MRJob، به یک کار با ۲ مرحله و هر مرحله حاوی یک نقشهبردار و یک کاهنده نیاز داریم. برای این کار باید یک تابع step() به کلاس mrjob اضافه کنیم تا به mrjob بگوییم کدام نگاشت و کاهنده با هم استفاده می شوند.
def steps():
return [MRStep(mapper=self.mapper1,
reducer=self.reducer1),
MRStep(mapper = self.mapper2,
reducer=self.reducer2)]
اکنون میتوانیم نگاشتها و کاهندههای خود را از بالا به پایین بنویسیم. برای mapper1، ما می خواهیم به خروجی متوسط ۳ برسیم:
def mapper1(self, key, row):
yield ((row[‘gender’], row[‘start_station_name’]), 1)
سپس میتوانیم از یک کاهنده برای جمعآوری تمام شمارشها برای دستیابی به خروجی متوسط استفاده کنیم:
def reducer1(self, gender_station, count):
yield (gender_station, sum(count))
سپس از یک نگاشت برای بازسازی جفت کلید-مقدار استفاده می کنیم تا فقط جنسیت به عنوان کلید و جفت ایستگاه تعداد به عنوان مقدار داشته باشیم. این خروجی متوسط ۱ خواهد بود:
def mapper2(self, gender_station, count):
gender, station = gender_station
yield (gender, (station, count))
در نهایت فقط باید حداکثر مقدار را از جفت کلید-مقدار با اعمال یک کاهنده در خروجی میانی ۱ انتخاب کنیم:
def reducer2(self, gender, station_count):
genderMap = {‘0′:’Unknown’, ‘1’:’Male’, ‘2’:’Female’}
yield (genderMap[gender], max(station_count, key = lambda x : x[1]))
کل کد این کار:
from mrjob.step import MRStep | |
class GenderCount(MRJob): | |
def steps(self): | |
return [ | |
MRStep(mapper=self.mapper1, | |
reducer=self.reducer1), | |
MRStep(mapper = self.mapper2, | |
reducer=self.reducer2) | |
] | |
def mapper1(self, _, row): | |
yield ((row[‘gender’], row[‘start_station_name’]), 1) | |
def reducer1(self, gender_station, count): | |
yield (gender_station, sum(count)) | |
def mapper2(self, gender_station, count): | |
gender, station = gender_station | |
yield (gender, (station, count)) | |
def reducer2(self, gender, station_count): | |
genderMap = {‘0’:‘Unknown’, ‘1’:‘Male’, ‘2’:‘Female’} | |
yield (genderMap[gender], max(station_count, key = lambda x : x[1])) | |
task1 = GenderCount(args = []) | |
with open(‘citibike.csv’, ‘r’) as fi: | |
output = list(mr.runJob(enumerate(csv.DictReader(fi)), task1)) |
امیدواریم از ایجاد شغل خود لذت کافی ببرید!
دیدگاهها (0)