دیتاماینینگ یار

راهنمای Hitchhiker برای MapReduce با MRJob در پایتون

۱٫ MapReduce چیست؟

MapReduce یک الگوی برنامه‌نویسی برای پردازش کلان داده است که در آن داده‌ها به تکه‌های توزیع شده تقسیم می‌شوند و توسط یک سری تبدیل پردازش می‌شوند.

پارادایم برنامه نویسی MapReduce داده ها را در ۲ عملیات پردازش می کند: map() و سپس reduce(). map() یک تابع تعریف شده توسط کاربر است که هر رکورد داده را در مجموعه داده نگاشت می کند. ()reduce خروجی map() را با یک تابع تعریف شده توسط کاربر دیگر گروه بندی می کند.

۲٫ MapReduce Pipeline

MapReduce روی جفت‌های (کلید، مقدار) با انجام مراحل زیر کار می‌کند:

برای اینکه همه چیز را در چشم انداز قرار دهیم، اجازه دهید این مراحل را با یک مثال ساده از WordCount طی کنیم. فرض کنید سندی داریم که شامل دو خط است:

the article is about the mapreduce paradigmmapreduce is part of big data ecosystem

ما می خواهیم فرکانس هر کلمه در این سند را محاسبه کنیم. ما این مشکل را از پایین به بالا با شناسایی آنچه که به عنوان خروجی نهایی می خواهیم حل خواهیم کرد.

ورودی: (سند: خط متن)

خروجی: (کلمه منحصر به فرد، فرکانس)

خروجی نهایی ما لیستی از جفت (کلمه منحصر به فرد، فرکانس) است، و برای رسیدن به آن به یک خروجی میانی از هر کلمه در سند و تعداد ۱ – (کلمه، ۱) نیاز داریم. داشتن لیستی از (word, 1) به ما این امکان را می دهد که با کلمات ترکیب کنیم و تمام ۱ های یک کلمه را با هم اضافه کنیم.

سپس، SHUFFLE روی خروجی میانی ۱ انجام می‌شود. تمام جفت‌های (کلمه، ۱) را بر اساس کلمات گروه‌بندی می‌کند و خروجی میانی ۲ را تولید می‌کند:

در نهایت از یک Reducer برای جمع کردن تمام ۱ ها با هم استفاده می شود که خروجی نهایی را به ما می دهد:

۳٫ برنامه MapReduce در پایتون - معرفی mrjob

mrjob کتابخانه ای است که به شما امکان می دهد برنامه های پایتون را که روی Hadoop اجرا می شوند بنویسید. با mrjob می توانید کد خود را به صورت محلی و بدون نصب Hadoop آزمایش کنید یا آن را در یک کلاستر مورد نظر خود اجرا کنید. “اگر نمی خواهید یک متخصص Hadoop باشید اما به قدرت محاسباتی MapReduce نیاز دارید، mrjob ممکن است برای شما مناسب باشد.”

ابتدا mrjob را با اجرای:

pip install mrjob

سپس MRJob را وارد کنید

from mrjob.job import MRJob

پس از آن می توانید یک کار بنویسید که توسط یک کلاس تعریف شده است و شامل یک سری مراحل است که شامل نقشه کش ها، ترکیب کننده ها و یک کاهنده است. نقشه‌بردارها، ترکیب‌کننده‌ها و کاهنده‌ها اختیاری هستند، اگرچه باید حداقل یکی داشته باشید.

ابتدا یک کلاس به نام WordCount ایجاد می کنیم، به یاد داشته باشید که “MRJob” را در آرگومان قرار دهید. سپس یک تابع mapper و یک تابع کاهنده در داخل کلاس ایجاد می کنیم

class WordCount(MRJob): def mapper(self, key_1, value_1):
yield (key_2, value_2)
def reducer(self, key_2, value_2):
yield (key_3, value_3)

در داخل تابع mapper، جفت های ورودی (کلید، مقدار) ما (_، خط) و جفت های خروجی ما باید (word، ۱) باشند. توجه داشته باشید که (_، خط) به MRJob می گوید که کلید را نادیده بگیرد و هر خط از سند را به عنوان مقدار در نظر بگیرد.

def mapper(self, _, line):
برای کلمه در خط:
  if length(word) > 0:
   yield (word, 1)

در داخل تابع کاهنده، جفت های ورودی (کلید، مقدار) ما (کلمه، تعداد) و جفت های خروجی ما باید (کلمه، مجموع (شمار)) باشند.

def reducer(self, word, count):
yield (word, sum(count))

سپس این کلاس کامل می شود. در نهایت فقط باید این کلاس را به یک وظیفه اختصاص دهیم.

task0 = تعداد کلمات (args = [])

برای چاپ خروجی، فایل txt را به صورت استریم می‌خوانیم، به این معنی که آن را سطر به ردیف می‌خوانیم و task0 را در هر ردیف انجام می‌دهیم.

class WordCount(MRJob):
 #create a mapper()
 def mapper(self, _, line):
 for word in line.strip().split(‘ ‘):
 if len(word) > 0:
 yield (word, 1)
  
 #create a reducer()
 def reducer(self, word, count):
 yield (word, sum(count))
  
 task0 = WordCount(args = [])
 with open(‘book.txt’, ‘r’) as fi:
 row = [(i,line.strip()) for i,line in enumerate(fi) if i<1000] #fetch only the first 10000 rows
 output = list(mr.runJob(row, task0))
 output = sorted(output, key = lambda x: x[1]) # sort the ouput in descending order

۲ : ما یک فایل “citibike.csv” داریم که حاوی داده های سفرهای citibike در نیویورک است. هر ردیف از داده ها نشان دهنده سفر منحصر به فرد دوچرخه سوار از ایستگاه A به ایستگاه B است. ستون هایی که برای این کار نیاز داریم عبارتند از “start_station_name”، “end_station_name” و “gender”. هر ردیف از داده ها نشان دهنده یک سفر منحصر به فرد است.

اکنون از ما خواسته می‌شود ایستگاه‌هایی را محاسبه کنیم که بیشترین سواری از آن شروع شده است، به ازای هر جنسیت. به این معنی که نام ایستگاه با بیشترین تعداد پیکاپ دوچرخه برای سوارکاران زن، برای سواران مرد و برای سواران ناشناس چه بوده است.

خروجی لیستی از تاپل ها خواهد بود که هر کدام شامل یک برچسب جنسیت (همانطور که در زیر نشان داده شده است) و یک تاپل دیگر شامل نام ایستگاه و تعداد کل سفرهای شروع شده در آن ایستگاه برای آن جنسیت است. یعنی: (۰: (ایستگاه_A، ۱۰۰))

نگاشت برچسب برای ستون جنسیت در citibike.csv این است: (‘۰’= ناشناخته ؛ ‘۱’ = مذکر ؛ ‘۲’ = زن ).

مانند قبل، باید از پایین به بالا فکر کنیم. خروجی مورد نظر ما در پایان یک تاپل تودرتو است.

#desired output
(gender_label, (max_start_station_name, max_number_of_trips))

 

که در آن gender_label کلید و (start_station_name, max_number_of_trips) مقدار است. این را می توان با استفاده از یک کاهنده برای استخراج حداکثر مقدار از همه مقادیر در صورتی که یک جفت کلید-مقدار متوسط ​​داشته باشیم به دست آورد:

#intermediate output1
(gender_label, (start_station_name, number_of_trips_for_station))

 

در خروجی میانی ۱، gender_label را به عنوان کلید و (start_station_name، number_of_trips_for_station) را به عنوان مقدار داریم. این بدان معناست که برای هر جنسیت، تعداد پیکاپ‌های هر ایستگاه را شمارش می‌کنیم که به این صورت خواهد بود: (اعداد ساخته شده است)

 

(۰, (station_A, 4))
(0, (station_B, 1))
(0, (station_C, 210))
(1, (station_A, 1))
(1, (station_B, 378))
(1, (station_C, 3))
(2, (station_A, 5))
(2, (station_B, 4))
(2, (station_C, 0))

این را می توان با تغییر کلید از gender_label به جفت ( جنسیت، ایستگاه) در خروجی میانی ۲ بازنویسی کرد (نقشه برداری کرد) .

# intermediate output2
((gender_label, start_station_name), number_of_trips_for_station))

 

 

اکنون ما چیزی برای رسیدن به آسانی داریم. ما فقط می‌توانیم یک رکورد از gender_label و start_station_name هر ردیف را با تعداد ۱ نگه‌داریم و کل تعداد را با کلید ( gender_label، start_station_name) مانند زیر جمع‌بندی کنیم:

((۰, station_A), 1 1 1 1 ))
((0, station_B), 1))
((0, station_C), 1 1 1 …))
((1, station_A), 1))
((1, station_B), 1 1 1 …))
((1, station_C), 1 1 1))
((2, station_A), 1 1 1 1 1))
((2, station_B), 1 1 1 1))
((2, station_C), ))

این را می توان به راحتی با استفاده از یک کاهنده در رکورد ما از gender_label و start_station_name هر ردیف با تعداد ۱ انجام داد. این خروجی میانی ما ۳ خواهد بود:

# intermediate output3
((0, station_A), 1))
((0, station_A), 1))
((0, station_A), 1))
((0, station_C), 1))
((1, station_B), 1))
((0, station_B), 1))
((0, station_C), 1))
((2, station_A), 1))
((1, station_B), 1))
((1, station_C), 1))

برای کدنویسی این مورد در MRJob، به یک کار با ۲ مرحله و هر مرحله حاوی یک نقشه‌بردار و یک کاهنده نیاز داریم. برای این کار باید یک تابع step() به کلاس mrjob اضافه کنیم تا به mrjob بگوییم کدام نگاشت و کاهنده با هم استفاده می شوند.

 

def steps():
return [MRStep(mapper=self.mapper1,
reducer=self.reducer1),
MRStep(mapper = self.mapper2,
reducer=self.reducer2)]

اکنون می‌توانیم نگاشت‌ها و کاهنده‌های خود را از بالا به پایین بنویسیم. برای mapper1، ما می خواهیم به خروجی متوسط ​​۳ برسیم:

def mapper1(self, key, row):
yield ((row[‘gender’], row[‘start_station_name’]), 1)

def reducer1(self, gender_station, count):
yield (gender_station, sum(count))

سپس از یک نگاشت برای بازسازی جفت کلید-مقدار استفاده می کنیم تا فقط جنسیت به عنوان کلید و جفت ایستگاه تعداد به عنوان مقدار داشته باشیم. این خروجی متوسط ​​۱ خواهد بود:

def mapper2(self, gender_station, count):
gender, station = gender_station
yield (gender, (station, count))

def reducer2(self, gender, station_count):
genderMap = {‘0′:’Unknown’, ‘1’:’Male’, ‘2’:’Female’}
yield (genderMap[gender], max(station_count, key = lambda x : x[1]))

from mrjob.step import MRStep
  
 class GenderCount(MRJob):
 def steps(self):
 return [
 MRStep(mapper=self.mapper1,
 reducer=self.reducer1),
 MRStep(mapper = self.mapper2,
 reducer=self.reducer2)
 ]
 def mapper1(self, _, row):
 yield ((row[‘gender’], row[‘start_station_name’]), 1)
  
 def reducer1(self, gender_station, count):
 yield (gender_station, sum(count))
  
 def mapper2(self, gender_station, count):
 gender, station = gender_station
 yield (gender, (station, count))
  
 def reducer2(self, gender, station_count):
 genderMap = {‘0’:‘Unknown’, ‘1’:‘Male’, ‘2’:‘Female’}
 yield (genderMap[gender], max(station_count, key = lambda x : x[1]))
  
  
 task1 = GenderCount(args = [])
 with open(‘citibike.csv’, ‘r’) as fi:
 output = list(mr.runJob(enumerate(csv.DictReader(fi)), task1))

امیدواریم از ایجاد شغل خود لذت کافی ببرید!

ارتباط و مشاوره با شما ۰۹۳۶۷۹۳۸۰۱۸ در واتس اپ

دیدگاه‌ها (0)

*
*


پاسخ من را به ایمیلم ارسال کن

error: با عرض پوزش؛ لطفا از مطالعه مطالب لذت ببرید.