hadoop 实现矩阵相乘
我们大学里学过矩阵相乘,如下,当两个矩阵A,B,A的行等于B的列时可以相乘。然后乘积是A的行乘以B的列得出。我们今天用hadoop来实现一下矩阵的乘法。
$$
\left[
\begin{matrix}
A1 & A2 & A3 \
A4 & A5 & A6 \
A7 & A8 & A9
\end{matrix}
\right]
X
\left[
\begin{matrix}
B1 & B2 & B3 \
B4 & B5 & B6 \
B7 & B8 & B9
\end{matrix}
\right]
$$
计算过程是A行乘以B列,我们可以将B先转置(行列互换),然后在用A行乘以B行可以得出结果,具体步骤如下:
1.将B(下面可以理解为右边的矩阵)转置,结果输出B’
2.AxB’(B’的结果放在hdfs的文件系统缓存中),输出结果
我们先看一下例子的两个矩阵数据
$$
\left[
\begin{matrix}
1 & 2 & -1 \
2 & 1 & 3 \
0 & 3 & 1
\end{matrix}\tag{A}
\right]
$$
$$
\left[
\begin{matrix}
1 & 2 & 3 \
3 & -1 & 0 \
-4 & 2 & 1
\end{matrix}\tag{B}
\right]
$$
我们定义放在hdfs文件中数据形式如下
1 1_1,2_2,3_-1
2 1_2,2_2,3_3
3 1_0,2_3,3_1
一行的最左边是行号,右边的是数据,“1_1”这种左边是列号,右边是数据值
代码部分:
第一步:将B(下面可以理解为右边的矩阵)转置,结果输出B’
Map阶段:
将右矩阵的数据读入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class MapMatrixTranspose extends Mapper<LongWritable, Text, Text, Text> {
private Text outKey = new Text(); private Text outValue = new Text();
@Override protected void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { String[] rowAndLines = values.toString().split("\t"); String row = rowAndLines[0]; String[] lines = rowAndLines[1].split(","); for (String line : lines) { String colunm = line.split("_")[0]; String value = line.split("_")[1]; outKey.set(colunm); outValue.set(row + "_" + value); context.write(outKey, outValue); } } }
|
Reducer阶段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class ReduceMatrixTranspose extends Reducer<Text, Text, Text, Text> { private Text outKey = new Text(); private Text outValues = new Text();
@Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for (Text text : values) { sb.append(text).append(","); } String result = null; if (sb.toString().endsWith(",")) { result = sb.substring(0, sb.length() - 1); }
outKey.set(key); outValues.set(Objects.requireNonNull(result)); context.write(outKey, outValues); } }
|
Main:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class Transpose {
private static String hdfs = "hdfs://localhost:9000";
public static void main(String[] args) { int result = -1; Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfs); try { Job job = Job.getInstance(conf, "step1"); job.setJarByClass(Transpose.class); job.setMapperClass(MapMatrixTranspose.class); job.setReducerClass(ReduceMatrixTranspose.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileSystem fs = FileSystem.get(conf); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); if (job.waitForCompletion(true)) { System.out.println("matrix transpose success"); } else { System.out.println("matrix transpose fail"); } } catch (Exception e) { System.out.println("执行异常" + e.getMessage()); } } }
|
然后将代码打成jar包,将左矩阵的数据放入到hdfs中,运行hadoop命令。
1
| hadoop jar matrix.jar 文件路径 输出路径
|
在查看输出结果
得出结果
1 2 3
| 1 3_-4,2_3,1_1 2 3_2,2_-1,1_2 3 3_1,2_0,1_3
|
第二步:AxB’(B’的结果放在hdfs的文件系统缓存中),输出结果
Map阶段:
这里从分布式缓存中读取了右矩阵right_matrix的值,这个别名是在main方法里面设置的,用法可以参考:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| public class MapMatrixMultiply extends Mapper<LongWritable, Text, Text, Text> {
private Text outKey = new Text(); private Text outValue = new Text(); private List<String> cacheList = new ArrayList<>();
@Override protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { super.setup(context); try (FileReader fr = new FileReader("right_matrix"); BufferedReader br = new BufferedReader(fr)) { String line; while ((line = br.readLine()) != null) { cacheList.add(line); } } }
@Override protected void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { String rowMatrix1 = values.toString().split("\t")[0]; String[] columnValueArrayMatrix1 = values.toString().split("\t")[1].split(","); for (String line : cacheList) {
String rowMatrix2 = line.split("\t")[0]; String[] columnValueArrayMatrix2 = line.toString().split("\t")[1].split(",");
int result = 0; for (String columnValueMatrix1 : columnValueArrayMatrix1) { String columnMatrix1 = columnValueMatrix1.split("_")[0]; String columnValue1 = columnValueMatrix1.split("_")[1]; for (String columnValueMatrix2 : columnValueArrayMatrix2) { if (columnValueMatrix2.startsWith(columnMatrix1 + "_")) { String columnValue2 = columnValueMatrix2.split("_")[1]; result += Integer.valueOf(columnValue1) * Integer.valueOf(columnValue2); } } }
outKey.set(rowMatrix1); outValue.set(rowMatrix2 + "_" + result); context.write(outKey, outValue); } } }
|
Reducer阶段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class ReduceMatrixMultiply extends Reducer<Text, Text, Text, Text> {
private Text outKey = new Text(); private Text outValue = new Text();
@Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for (Text text : values) { sb.append(text).append(","); } String result = null; if (sb.toString().endsWith(",")) { result = sb.substring(0, sb.length() - 1); }
outKey.set(key); outValue.set(Objects.requireNonNull(result)); context.write(outKey, outValue); } }
|
Main:
在这里设置了hdfs分布式缓存的路径,通过args[0]传入的,然后在map阶段进行了调用,#文件名,就是给它取的别名。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class Multiply {
private static String hdfs = "hdfs://localhost:9000";
public static void main(String[] args) { Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs); int result = -1; try { Job job = Job.getInstance(conf, "matrix_multiply"); job.addCacheArchive(new URI(args[0] + "#right_matrix")); job.setJarByClass(Transpose.class); job.setMapperClass(MapMatrixMultiply.class); job.setReducerClass(ReduceMatrixMultiply.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf); FileInputFormat.addInputPath(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2]));
if (job.waitForCompletion(true)) { System.out.println("matrix transpose success"); } else { System.out.println("matrix transpose fail"); } } catch (Exception e) { System.out.println("执行异常" + e.getMessage()); } } }
|
运行代码
1
| hadoop jar matrix2.jar 分布式缓存地址 输入地址 输出地址
|
得到结果,大功告成:
1 2 3
| 1 3_2,2_-2,1_11 2 3_9,2_9,1_-7 3 3_1,2_-1,1_5
|